1use crate::blocks::Tipset;
5use crate::db::car::forest::{
6 FOREST_CAR_FILE_EXTENSION, TEMP_FOREST_CAR_FILE_EXTENSION, new_forest_car_temp_path_in,
7};
8use crate::db::car::{ForestCar, ManyCar};
9use crate::interpreter::VMTrace;
10use crate::networks::ChainConfig;
11use crate::rpc::sync::SnapshotProgressTracker;
12use crate::shim::clock::ChainEpoch;
13use crate::state_manager::{NO_CALLBACK, StateManager};
14use crate::utils::db::car_stream::CarStream;
15use crate::utils::io::EitherMmapOrRandomAccessFile;
16use crate::utils::net::{DownloadFileOption, download_to};
17use anyhow::{Context, bail};
18use futures::TryStreamExt;
19use serde::{Deserialize, Serialize};
20use std::{
21 ffi::OsStr,
22 fs,
23 path::{Path, PathBuf},
24 sync::Arc,
25 time,
26};
27use tokio::io::AsyncWriteExt;
28use tracing::{debug, info, warn};
29use url::Url;
30use walkdir::WalkDir;
31
32#[cfg(doc)]
33use crate::rpc::eth::types::EthHash;
34
35#[cfg(doc)]
36use crate::blocks::TipsetKey;
37
38#[cfg(doc)]
39use cid::Cid;
40
41pub fn load_all_forest_cars_with_cleanup<T>(
43 store: &ManyCar<T>,
44 forest_car_db_dir: &Path,
45) -> anyhow::Result<()> {
46 load_all_forest_cars_internal(store, forest_car_db_dir, true)
47}
48
49pub fn load_all_forest_cars<T>(store: &ManyCar<T>, forest_car_db_dir: &Path) -> anyhow::Result<()> {
51 load_all_forest_cars_internal(store, forest_car_db_dir, false)
52}
53
54fn load_all_forest_cars_internal<T>(
55 store: &ManyCar<T>,
56 forest_car_db_dir: &Path,
57 cleanup: bool,
58) -> anyhow::Result<()> {
59 if !forest_car_db_dir.is_dir() {
60 fs::create_dir_all(forest_car_db_dir)?;
61 }
62 for file in WalkDir::new(forest_car_db_dir)
63 .max_depth(1)
64 .into_iter()
65 .filter_map(|e| {
66 e.ok().and_then(|e| {
67 if !e.file_type().is_dir() {
68 Some(e.into_path())
69 } else {
70 None
71 }
72 })
73 })
74 {
75 if let Some(filename) = file.file_name().and_then(OsStr::to_str) {
76 if filename.ends_with(FOREST_CAR_FILE_EXTENSION) {
77 let car = ForestCar::try_from(file.as_path())
78 .with_context(|| format!("Error loading car DB at {}", file.display()))?;
79 store.read_only(car.into())?;
80 debug!("Loaded car DB at {}", file.display());
81 } else if cleanup && filename.ends_with(TEMP_FOREST_CAR_FILE_EXTENSION) {
82 match std::fs::remove_file(&file) {
84 Ok(_) => {
85 info!("Deleted temp car DB at {}", file.display());
86 }
87 Err(e) => {
88 warn!("Failed to delete temp car DB at {}: {e}", file.display());
89 }
90 }
91 }
92 }
93 }
94
95 tracing::info!("Loaded {} CARs", store.len());
96
97 Ok(())
98}
99
100#[derive(
101 Default,
102 PartialEq,
103 Eq,
104 Debug,
105 Clone,
106 Copy,
107 strum::Display,
108 strum::EnumString,
109 Serialize,
110 Deserialize,
111)]
112#[strum(serialize_all = "lowercase")]
113#[cfg_attr(test, derive(derive_quickcheck_arbitrary::Arbitrary))]
114pub enum ImportMode {
115 #[default]
116 Auto,
118 Copy,
120 Move,
122 Symlink,
124 Hardlink,
126}
127
128pub async fn import_chain_as_forest_car(
131 from_path: &Path,
132 forest_car_db_dir: &Path,
133 import_mode: ImportMode,
134 rpc_endpoint: Url,
135 f3_root: &Path,
136 chain_config: &ChainConfig,
137 snapshot_progress_tracker: &SnapshotProgressTracker,
138) -> anyhow::Result<(PathBuf, Tipset)> {
139 info!("Importing chain from snapshot at: {}", from_path.display());
140
141 let stopwatch = time::Instant::now();
142
143 let forest_car_db_path = forest_car_db_dir.join(format!(
144 "{}{FOREST_CAR_FILE_EXTENSION}",
145 chrono::Utc::now().timestamp_millis()
146 ));
147
148 let move_or_copy = |mode: ImportMode| {
149 let forest_car_db_path = forest_car_db_path.clone();
150 async move {
151 let downloaded_car_temp_path = new_forest_car_temp_path_in(forest_car_db_dir)?;
152 if let Ok(url) = Url::parse(&from_path.display().to_string()) {
153 download_to(
154 &url,
155 &downloaded_car_temp_path,
156 DownloadFileOption::Resumable,
157 snapshot_progress_tracker.create_callback(),
158 )
159 .await?;
160
161 snapshot_progress_tracker.completed();
162 } else {
163 snapshot_progress_tracker.not_required();
164 if ForestCar::is_valid(&EitherMmapOrRandomAccessFile::open(from_path)?) {
165 move_or_copy_file(from_path, &downloaded_car_temp_path, mode)?;
166 } else {
167 transcode_into_forest_car(from_path, &downloaded_car_temp_path).await?;
169 if mode == ImportMode::Move {
170 std::fs::remove_file(from_path).context("Error removing original file")?;
171 }
172 }
173 }
174
175 if ForestCar::is_valid(&EitherMmapOrRandomAccessFile::open(
176 &downloaded_car_temp_path,
177 )?) {
178 downloaded_car_temp_path.persist(&forest_car_db_path)?;
179 } else {
180 let forest_car_db_temp_path = new_forest_car_temp_path_in(forest_car_db_dir)?;
182 transcode_into_forest_car(&downloaded_car_temp_path, &forest_car_db_temp_path)
183 .await?;
184 forest_car_db_temp_path.persist(&forest_car_db_path)?;
185 }
186 anyhow::Ok(())
187 }
188 };
189
190 match import_mode {
191 ImportMode::Auto => {
192 if Url::parse(&from_path.display().to_string()).is_ok() {
193 move_or_copy(ImportMode::Move).await?;
195 } else if ForestCar::is_valid(&EitherMmapOrRandomAccessFile::open(from_path)?) {
196 tracing::info!(
197 "Hardlinking {} to {}",
198 from_path.display(),
199 forest_car_db_path.display()
200 );
201 if std::fs::hard_link(from_path, &forest_car_db_path).is_err() {
202 tracing::warn!("Error creating hardlink, fallback to copy");
203 move_or_copy(ImportMode::Copy).await?;
204 }
205 } else {
206 tracing::warn!(
207 "Snapshot file is not a valid forest.car.zst file, fallback to copy"
208 );
209 move_or_copy(ImportMode::Copy).await?;
210 }
211 }
212 ImportMode::Copy | ImportMode::Move => {
213 move_or_copy(import_mode).await?;
214 }
215 ImportMode::Symlink => {
216 let from_path = std::path::absolute(from_path)?;
217 if ForestCar::is_valid(&EitherMmapOrRandomAccessFile::open(&from_path)?) {
218 tracing::info!(
219 "Symlinking {} to {}",
220 from_path.display(),
221 forest_car_db_path.display()
222 );
223 std::os::unix::fs::symlink(from_path, &forest_car_db_path)
224 .context("Error creating symlink")?;
225 } else {
226 bail!("Snapshot file must be a valid forest.car.zst file");
227 }
228 }
229 ImportMode::Hardlink => {
230 if ForestCar::is_valid(&EitherMmapOrRandomAccessFile::open(from_path)?) {
231 tracing::info!(
232 "Hardlinking {} to {}",
233 from_path.display(),
234 forest_car_db_path.display()
235 );
236 std::fs::hard_link(from_path, &forest_car_db_path)
237 .context("Error creating hardlink")?;
238 } else {
239 bail!("Snapshot file must be a valid forest.car.zst file");
240 }
241 }
242 };
243
244 let forest_car = ForestCar::try_from(forest_car_db_path.as_path())?;
245
246 if let Some(f3_cid) = forest_car.metadata().as_ref().and_then(|m| m.f3_data) {
247 if crate::f3::get_f3_sidecar_params(chain_config)
248 .initial_power_table
249 .is_none()
250 {
251 tracing::warn!(
253 "skipped importing F3 data as the initial power table CID is not set in the current manifest"
254 );
255 } else {
256 let mut f3_data = forest_car
257 .get_reader(f3_cid)?
258 .with_context(|| format!("f3 data not found, cid: {f3_cid}"))?;
259 let mut temp_f3_snap = tempfile::Builder::new()
260 .suffix(".f3snap.bin")
261 .tempfile_in(forest_car_db_dir)?;
262 {
263 let f = temp_f3_snap.as_file_mut();
264 std::io::copy(&mut f3_data, f)?;
265 f.sync_all()?;
266 }
267 if let Err(e) = crate::f3::import_f3_snapshot(
268 chain_config,
269 rpc_endpoint.to_string(),
270 f3_root.display().to_string(),
271 temp_f3_snap.path().display().to_string(),
272 ) {
273 tracing::error!("Failed to import F3 snapshot: {e}");
275 }
276 }
277 }
278
279 let ts = forest_car.heaviest_tipset()?;
280 info!(
281 "Imported snapshot in: {}s, heaviest tipset epoch: {}, key: {}",
282 stopwatch.elapsed().as_secs(),
283 ts.epoch(),
284 ts.key()
285 );
286
287 Ok((forest_car_db_path, ts))
288}
289
290fn move_or_copy_file(from: &Path, to: &Path, import_mode: ImportMode) -> anyhow::Result<()> {
291 match import_mode {
292 ImportMode::Move => {
293 tracing::info!("Moving {} to {}", from.display(), to.display());
294 if fs::rename(from, to).is_ok() {
295 Ok(())
296 } else {
297 fs::copy(from, to).context("Error copying file")?;
298 fs::remove_file(from).context("Error removing original file")?;
299 Ok(())
300 }
301 }
302 ImportMode::Copy => {
303 tracing::info!("Copying {} to {}", from.display(), to.display());
304 fs::copy(from, to).map(|_| ()).context("Error copying file")
305 }
306 m => {
307 bail!("{m} must be handled elsewhere");
308 }
309 }
310}
311
312async fn transcode_into_forest_car(from: &Path, to: &Path) -> anyhow::Result<()> {
313 tracing::info!(
314 from = %from.display(),
315 to = %to.display(),
316 "transcoding into forest car"
317 );
318 let car_stream = CarStream::new_from_path(from).await?;
319 let roots = car_stream.header_v1.roots.clone();
320
321 let mut writer = tokio::io::BufWriter::new(tokio::fs::File::create(to).await?);
322 let frames = crate::db::car::forest::Encoder::compress_stream_default(
323 car_stream.map_err(anyhow::Error::from),
324 );
325 crate::db::car::forest::Encoder::write(&mut writer, roots, frames).await?;
326 writer.shutdown().await?;
327
328 Ok(())
329}
330
331async fn process_ts<DB>(
332 ts: &Tipset,
333 state_manager: &Arc<StateManager<DB>>,
334 delegated_messages: &mut Vec<(crate::message::SignedMessage, u64)>,
335) -> anyhow::Result<()>
336where
337 DB: fvm_ipld_blockstore::Blockstore + Send + Sync + 'static,
338{
339 let epoch = ts.epoch();
340 let tsk = ts.key().clone();
341
342 state_manager
343 .compute_tipset_state(ts.clone(), NO_CALLBACK, VMTrace::NotTraced)
344 .await?;
345
346 delegated_messages.append(
347 &mut state_manager
348 .chain_store()
349 .headers_delegated_messages(ts.block_headers().iter())?,
350 );
351 tracing::trace!("Indexing tipset @{}: {}", epoch, &tsk);
352 state_manager.chain_store().put_tipset_key(&tsk)?;
353
354 Ok(())
355}
356
357pub enum RangeSpec {
358 To(ChainEpoch),
359 NumTipsets(usize),
360}
361
362impl std::fmt::Display for RangeSpec {
363 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
364 match self {
365 RangeSpec::To(epoch) => write!(f, "To epoch: {}", epoch),
366 RangeSpec::NumTipsets(n) => write!(f, "Tipsets: {}", n),
367 }
368 }
369}
370
371pub async fn backfill_db<DB>(
380 state_manager: &Arc<StateManager<DB>>,
381 head_ts: &Tipset,
382 spec: RangeSpec,
383) -> anyhow::Result<()>
384where
385 DB: fvm_ipld_blockstore::Blockstore + Send + Sync + 'static,
386{
387 tracing::info!("Starting index backfill...");
388
389 let mut delegated_messages = vec![];
390
391 let mut num_backfills = 0;
392
393 match spec {
394 RangeSpec::To(to_epoch) => {
395 for ts in head_ts
396 .clone()
397 .chain(&state_manager.chain_store().blockstore())
398 .take_while(|ts| ts.epoch() >= to_epoch)
399 {
400 process_ts(&ts, state_manager, &mut delegated_messages).await?;
401 num_backfills += 1;
402 }
403 }
404 RangeSpec::NumTipsets(n_tipsets) => {
405 for ts in head_ts
406 .clone()
407 .chain(&state_manager.chain_store().blockstore())
408 .take(n_tipsets)
409 {
410 process_ts(&ts, state_manager, &mut delegated_messages).await?;
411 num_backfills += 1;
412 }
413 }
414 }
415
416 state_manager
417 .chain_store()
418 .process_signed_messages(&delegated_messages)?;
419
420 tracing::info!("Total successful backfills: {}", num_backfills);
421
422 Ok(())
423}
424
425#[cfg(test)]
426mod test {
427 use super::*;
428
429 #[tokio::test]
430 async fn import_snapshot_from_file_valid() {
431 for import_mode in [ImportMode::Auto, ImportMode::Copy, ImportMode::Move] {
432 import_snapshot_from_file("test-snapshots/chain4.car", import_mode)
433 .await
434 .unwrap();
435 }
436
437 for import_mode in [ImportMode::Symlink, ImportMode::Hardlink] {
439 import_snapshot_from_file("test-snapshots/chain4.car", import_mode)
440 .await
441 .unwrap_err();
442 }
443 }
444
445 #[tokio::test]
446 async fn import_snapshot_from_compressed_file_valid() {
447 for import_mode in [ImportMode::Auto, ImportMode::Copy, ImportMode::Move] {
448 import_snapshot_from_file("test-snapshots/chain4.car.zst", import_mode)
449 .await
450 .unwrap();
451 }
452
453 for import_mode in [ImportMode::Symlink, ImportMode::Hardlink] {
455 import_snapshot_from_file("test-snapshots/chain4.car", import_mode)
456 .await
457 .unwrap_err();
458 }
459 }
460
461 #[tokio::test]
462 async fn import_snapshot_from_forest_car_valid() {
463 for import_mode in [
464 ImportMode::Auto,
465 ImportMode::Copy,
466 ImportMode::Move,
467 ImportMode::Symlink,
468 ImportMode::Hardlink,
469 ] {
470 import_snapshot_from_file("test-snapshots/chain4.forest.car.zst", import_mode)
471 .await
472 .unwrap();
473 }
474 }
475
476 #[tokio::test]
477 async fn import_snapshot_from_file_invalid() {
478 for import_mode in &[
479 ImportMode::Auto,
480 ImportMode::Copy,
481 ImportMode::Move,
482 ImportMode::Symlink,
483 ImportMode::Hardlink,
484 ] {
485 import_snapshot_from_file("Cargo.toml", *import_mode)
486 .await
487 .unwrap_err();
488 }
489 }
490
491 #[tokio::test]
492 async fn import_snapshot_from_file_not_found() {
493 for import_mode in &[
494 ImportMode::Auto,
495 ImportMode::Copy,
496 ImportMode::Move,
497 ImportMode::Symlink,
498 ImportMode::Hardlink,
499 ] {
500 import_snapshot_from_file("dummy.car", *import_mode)
501 .await
502 .unwrap_err();
503 }
504 }
505
506 #[tokio::test]
507 async fn import_snapshot_from_url_not_found() {
508 for import_mode in &[
509 ImportMode::Auto,
510 ImportMode::Copy,
511 ImportMode::Move,
512 ImportMode::Symlink,
513 ImportMode::Hardlink,
514 ] {
515 import_snapshot_from_file("https://forest.chainsafe.io/dummy.car", *import_mode)
516 .await
517 .unwrap_err();
518 }
519 }
520
521 async fn import_snapshot_from_file(
522 file_path: &str,
523 import_mode: ImportMode,
524 ) -> anyhow::Result<()> {
525 let temp_file = tempfile::Builder::new().tempfile()?;
527 fs::copy(Path::new(file_path), temp_file.path())?;
528 let file_path = temp_file.path();
529
530 let temp_db_dir = tempfile::Builder::new().tempdir()?;
531
532 let (path, ts) = import_chain_as_forest_car(
533 file_path,
534 temp_db_dir.path(),
535 import_mode,
536 "http://127.0.0.1:2345/rpc/v1".parse().unwrap(),
537 Path::new("test"),
538 &ChainConfig::devnet(),
539 &SnapshotProgressTracker::default(),
540 )
541 .await?;
542 match import_mode {
543 ImportMode::Symlink => {
544 assert_eq!(
545 std::path::absolute(path.read_link()?)?,
546 std::path::absolute(file_path)?
547 );
548 }
549 ImportMode::Move => {
550 assert!(!file_path.exists());
551 assert!(path.is_file());
552 }
553 _ => {
554 assert!(file_path.is_file());
555 assert!(path.is_file());
556 }
557 }
558 assert!(ts.epoch() > 0);
559 Ok(())
560 }
561}