1use crate::blocks::Tipset;
5use crate::db::car::forest::{
6 FOREST_CAR_FILE_EXTENSION, TEMP_FOREST_CAR_FILE_EXTENSION, new_forest_car_temp_path_in,
7};
8use crate::db::car::{ForestCar, ManyCar};
9use crate::networks::ChainConfig;
10use crate::prelude::*;
11use crate::rpc::sync::SnapshotProgressTracker;
12use crate::shim::clock::ChainEpoch;
13use crate::state_manager::StateManager;
14use crate::utils::db::car_stream::CarStream;
15use crate::utils::io::EitherMmapOrRandomAccessFile;
16use crate::utils::net::{DownloadFileOption, download_to};
17use anyhow::{Context, bail};
18use futures::TryStreamExt;
19use serde::{Deserialize, Serialize};
20use std::{
21 ffi::OsStr,
22 fs,
23 path::{Path, PathBuf},
24 time,
25};
26use tokio::io::AsyncWriteExt;
27use tracing::{debug, info, warn};
28use url::Url;
29use walkdir::WalkDir;
30
31#[cfg(doc)]
32use crate::rpc::eth::types::EthHash;
33
34#[cfg(doc)]
35use crate::blocks::TipsetKey;
36
37#[cfg(doc)]
38use cid::Cid;
39
40pub fn load_all_forest_cars_with_cleanup<T>(
42 store: &ManyCar<T>,
43 forest_car_db_dir: &Path,
44) -> anyhow::Result<()> {
45 load_all_forest_cars_internal(store, forest_car_db_dir, true)
46}
47
48pub fn load_all_forest_cars<T>(store: &ManyCar<T>, forest_car_db_dir: &Path) -> anyhow::Result<()> {
50 load_all_forest_cars_internal(store, forest_car_db_dir, false)
51}
52
53fn load_all_forest_cars_internal<T>(
54 store: &ManyCar<T>,
55 forest_car_db_dir: &Path,
56 cleanup: bool,
57) -> anyhow::Result<()> {
58 if !forest_car_db_dir.is_dir() {
59 fs::create_dir_all(forest_car_db_dir)?;
60 }
61 for file in WalkDir::new(forest_car_db_dir)
62 .max_depth(1)
63 .into_iter()
64 .filter_map(|e| {
65 e.ok().and_then(|e| {
66 if !e.file_type().is_dir() {
67 Some(e.into_path())
68 } else {
69 None
70 }
71 })
72 })
73 {
74 if let Some(filename) = file.file_name().and_then(OsStr::to_str) {
75 if filename.ends_with(FOREST_CAR_FILE_EXTENSION) {
76 let car = ForestCar::try_from(file.as_path())
77 .with_context(|| format!("Error loading car DB at {}", file.display()))?;
78 store.read_only(car.into())?;
79 debug!("Loaded car DB at {}", file.display());
80 } else if cleanup && filename.ends_with(TEMP_FOREST_CAR_FILE_EXTENSION) {
81 match std::fs::remove_file(&file) {
83 Ok(_) => {
84 info!("Deleted temp car DB at {}", file.display());
85 }
86 Err(e) => {
87 warn!("Failed to delete temp car DB at {}: {e}", file.display());
88 }
89 }
90 }
91 }
92 }
93
94 tracing::info!("Loaded {} CARs", store.len());
95
96 Ok(())
97}
98
99#[derive(
100 Default,
101 PartialEq,
102 Eq,
103 Debug,
104 Clone,
105 Copy,
106 strum::Display,
107 strum::EnumString,
108 Serialize,
109 Deserialize,
110)]
111#[strum(serialize_all = "lowercase")]
112#[cfg_attr(test, derive(derive_quickcheck_arbitrary::Arbitrary))]
113pub enum ImportMode {
114 #[default]
115 Auto,
117 Copy,
119 Move,
121 Symlink,
123 Hardlink,
125}
126
127pub async fn import_chain_as_forest_car(
130 from_path: &Path,
131 forest_car_db_dir: &Path,
132 import_mode: ImportMode,
133 rpc_endpoint: Url,
134 f3_root: &Path,
135 chain_config: &ChainConfig,
136 snapshot_progress_tracker: &SnapshotProgressTracker,
137) -> anyhow::Result<(PathBuf, Tipset)> {
138 info!("Importing chain from snapshot at: {}", from_path.display());
139
140 let stopwatch = time::Instant::now();
141
142 let forest_car_db_path = forest_car_db_dir.join(format!(
143 "{}{FOREST_CAR_FILE_EXTENSION}",
144 chrono::Utc::now().timestamp_millis()
145 ));
146
147 let move_or_copy = |mode: ImportMode| {
148 let forest_car_db_path = forest_car_db_path.clone();
149 async move {
150 let downloaded_car_temp_path = new_forest_car_temp_path_in(forest_car_db_dir)?;
151 if let Ok(url) = Url::parse(&from_path.display().to_string()) {
152 download_to(
153 &url,
154 &downloaded_car_temp_path,
155 DownloadFileOption::Resumable,
156 snapshot_progress_tracker.create_callback(),
157 )
158 .await?;
159
160 snapshot_progress_tracker.completed();
161 } else {
162 snapshot_progress_tracker.not_required();
163 if ForestCar::is_valid(&EitherMmapOrRandomAccessFile::open(from_path)?) {
164 move_or_copy_file(from_path, &downloaded_car_temp_path, mode)?;
165 } else {
166 transcode_into_forest_car(from_path, &downloaded_car_temp_path).await?;
168 if mode == ImportMode::Move {
169 std::fs::remove_file(from_path).context("Error removing original file")?;
170 }
171 }
172 }
173
174 if ForestCar::is_valid(&EitherMmapOrRandomAccessFile::open(
175 &downloaded_car_temp_path,
176 )?) {
177 downloaded_car_temp_path.persist(&forest_car_db_path)?;
178 } else {
179 let forest_car_db_temp_path = new_forest_car_temp_path_in(forest_car_db_dir)?;
181 transcode_into_forest_car(&downloaded_car_temp_path, &forest_car_db_temp_path)
182 .await?;
183 forest_car_db_temp_path.persist(&forest_car_db_path)?;
184 }
185 anyhow::Ok(())
186 }
187 };
188
189 match import_mode {
190 ImportMode::Auto => {
191 if Url::parse(&from_path.display().to_string()).is_ok() {
192 move_or_copy(ImportMode::Move).await?;
194 } else if ForestCar::is_valid(&EitherMmapOrRandomAccessFile::open(from_path)?) {
195 tracing::info!(
196 "Hardlinking {} to {}",
197 from_path.display(),
198 forest_car_db_path.display()
199 );
200 if std::fs::hard_link(from_path, &forest_car_db_path).is_err() {
201 tracing::warn!("Error creating hardlink, fallback to copy");
202 move_or_copy(ImportMode::Copy).await?;
203 }
204 } else {
205 tracing::warn!(
206 "Snapshot file is not a valid forest.car.zst file, fallback to copy"
207 );
208 move_or_copy(ImportMode::Copy).await?;
209 }
210 }
211 ImportMode::Copy | ImportMode::Move => {
212 move_or_copy(import_mode).await?;
213 }
214 ImportMode::Symlink => {
215 let from_path = std::path::absolute(from_path)?;
216 if ForestCar::is_valid(&EitherMmapOrRandomAccessFile::open(&from_path)?) {
217 tracing::info!(
218 "Symlinking {} to {}",
219 from_path.display(),
220 forest_car_db_path.display()
221 );
222 std::os::unix::fs::symlink(from_path, &forest_car_db_path)
223 .context("Error creating symlink")?;
224 } else {
225 bail!("Snapshot file must be a valid forest.car.zst file");
226 }
227 }
228 ImportMode::Hardlink => {
229 if ForestCar::is_valid(&EitherMmapOrRandomAccessFile::open(from_path)?) {
230 tracing::info!(
231 "Hardlinking {} to {}",
232 from_path.display(),
233 forest_car_db_path.display()
234 );
235 std::fs::hard_link(from_path, &forest_car_db_path)
236 .context("Error creating hardlink")?;
237 } else {
238 bail!("Snapshot file must be a valid forest.car.zst file");
239 }
240 }
241 };
242
243 let forest_car = ForestCar::try_from(forest_car_db_path.as_path())?;
244
245 if let Some(f3_cid) = forest_car.metadata().and_then(|m| m.f3_data) {
246 if crate::f3::get_f3_sidecar_params(chain_config)
247 .initial_power_table
248 .is_none()
249 {
250 tracing::warn!(
252 "skipped importing F3 data as the initial power table CID is not set in the current manifest"
253 );
254 } else {
255 let mut f3_data = forest_car
256 .get_reader(f3_cid)?
257 .with_context(|| format!("f3 data not found, cid: {f3_cid}"))?;
258 let mut temp_f3_snap = tempfile::Builder::new()
259 .suffix(".f3snap.bin")
260 .tempfile_in(forest_car_db_dir)?;
261 {
262 let f = temp_f3_snap.as_file_mut();
263 std::io::copy(&mut f3_data, f)?;
264 f.sync_all()?;
265 }
266 if let Err(e) = crate::f3::import_f3_snapshot(
267 chain_config,
268 rpc_endpoint.to_string(),
269 f3_root.display().to_string(),
270 temp_f3_snap.path().display().to_string(),
271 ) {
272 tracing::error!("Failed to import F3 snapshot: {e:#}");
274 }
275 }
276 }
277
278 let ts = forest_car.heaviest_tipset()?;
279 info!(
280 "Imported snapshot in: {}s, heaviest tipset epoch: {}, key: {}",
281 stopwatch.elapsed().as_secs(),
282 ts.epoch(),
283 ts.key()
284 );
285
286 Ok((forest_car_db_path, ts))
287}
288
289fn move_or_copy_file(from: &Path, to: &Path, import_mode: ImportMode) -> anyhow::Result<()> {
290 match import_mode {
291 ImportMode::Move => {
292 tracing::info!("Moving {} to {}", from.display(), to.display());
293 if fs::rename(from, to).is_ok() {
294 Ok(())
295 } else {
296 fs::copy(from, to).context("Error copying file")?;
297 fs::remove_file(from).context("Error removing original file")?;
298 Ok(())
299 }
300 }
301 ImportMode::Copy => {
302 tracing::info!("Copying {} to {}", from.display(), to.display());
303 fs::copy(from, to).map(|_| ()).context("Error copying file")
304 }
305 m => {
306 bail!("{m} must be handled elsewhere");
307 }
308 }
309}
310
311async fn transcode_into_forest_car(from: &Path, to: &Path) -> anyhow::Result<()> {
312 tracing::info!(
313 from = %from.display(),
314 to = %to.display(),
315 "transcoding into forest car"
316 );
317 let car_stream = CarStream::new_from_path(from).await?;
318 let roots = car_stream.header_v1.roots.clone();
319
320 let mut writer = tokio::io::BufWriter::new(tokio::fs::File::create(to).await?);
321 let frames = crate::db::car::forest::Encoder::compress_stream_default(
322 car_stream.map_err(anyhow::Error::from),
323 );
324 crate::db::car::forest::Encoder::write(&mut writer, roots, frames).await?;
325 writer.shutdown().await?;
326
327 Ok(())
328}
329
330async fn process_ts(
331 ts: &Tipset,
332 state_manager: &StateManager,
333 delegated_messages: &mut Vec<(crate::message::SignedMessage, u64)>,
334) -> anyhow::Result<()> {
335 let epoch = ts.epoch();
336 let tsk = ts.key().clone();
337
338 state_manager.load_executed_tipset(ts).await?;
339
340 delegated_messages.append(
341 &mut state_manager
342 .chain_store()
343 .headers_delegated_messages(ts.block_headers().iter())?,
344 );
345 tracing::trace!("Indexing tipset @{}: {}", epoch, &tsk);
346 tsk.save(state_manager.db())?;
347
348 Ok(())
349}
350
351pub enum RangeSpec {
352 To(ChainEpoch),
353 NumTipsets(usize),
354}
355
356impl std::fmt::Display for RangeSpec {
357 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
358 match self {
359 RangeSpec::To(epoch) => write!(f, "To epoch: {}", epoch),
360 RangeSpec::NumTipsets(n) => write!(f, "Tipsets: {}", n),
361 }
362 }
363}
364
365pub async fn backfill_db(
374 state_manager: &StateManager,
375 head_ts: &Tipset,
376 spec: RangeSpec,
377) -> anyhow::Result<()> {
378 tracing::info!("Starting index backfill...");
379
380 let mut delegated_messages = vec![];
381
382 let mut num_backfills = 0;
383
384 match spec {
385 RangeSpec::To(to_epoch) => {
386 for ts in head_ts
387 .shallow_clone()
388 .chain(&state_manager.chain_store().db())
389 .take_while(|ts| ts.epoch() >= to_epoch)
390 {
391 process_ts(&ts, state_manager, &mut delegated_messages).await?;
392 num_backfills += 1;
393 }
394 }
395 RangeSpec::NumTipsets(n_tipsets) => {
396 for ts in head_ts
397 .shallow_clone()
398 .chain(&state_manager.chain_store().db())
399 .take(n_tipsets)
400 {
401 process_ts(&ts, state_manager, &mut delegated_messages).await?;
402 num_backfills += 1;
403 }
404 }
405 }
406
407 state_manager
408 .chain_store()
409 .process_signed_messages(&delegated_messages)?;
410
411 tracing::info!("Total successful backfills: {}", num_backfills);
412
413 Ok(())
414}
415
416#[cfg(test)]
417mod test {
418 use super::*;
419
420 #[tokio::test]
421 async fn import_snapshot_from_file_valid() {
422 for import_mode in [ImportMode::Auto, ImportMode::Copy, ImportMode::Move] {
423 import_snapshot_from_file("test-snapshots/chain4.car", import_mode)
424 .await
425 .unwrap();
426 }
427
428 for import_mode in [ImportMode::Symlink, ImportMode::Hardlink] {
430 import_snapshot_from_file("test-snapshots/chain4.car", import_mode)
431 .await
432 .unwrap_err();
433 }
434 }
435
436 #[tokio::test]
437 async fn import_snapshot_from_compressed_file_valid() {
438 for import_mode in [ImportMode::Auto, ImportMode::Copy, ImportMode::Move] {
439 import_snapshot_from_file("test-snapshots/chain4.car.zst", import_mode)
440 .await
441 .unwrap();
442 }
443
444 for import_mode in [ImportMode::Symlink, ImportMode::Hardlink] {
446 import_snapshot_from_file("test-snapshots/chain4.car", import_mode)
447 .await
448 .unwrap_err();
449 }
450 }
451
452 #[tokio::test]
453 async fn import_snapshot_from_forest_car_valid() {
454 for import_mode in [
455 ImportMode::Auto,
456 ImportMode::Copy,
457 ImportMode::Move,
458 ImportMode::Symlink,
459 ImportMode::Hardlink,
460 ] {
461 import_snapshot_from_file("test-snapshots/chain4.forest.car.zst", import_mode)
462 .await
463 .unwrap();
464 }
465 }
466
467 #[tokio::test]
468 async fn import_snapshot_from_file_invalid() {
469 for import_mode in &[
470 ImportMode::Auto,
471 ImportMode::Copy,
472 ImportMode::Move,
473 ImportMode::Symlink,
474 ImportMode::Hardlink,
475 ] {
476 import_snapshot_from_file("Cargo.toml", *import_mode)
477 .await
478 .unwrap_err();
479 }
480 }
481
482 #[tokio::test]
483 async fn import_snapshot_from_file_not_found() {
484 for import_mode in &[
485 ImportMode::Auto,
486 ImportMode::Copy,
487 ImportMode::Move,
488 ImportMode::Symlink,
489 ImportMode::Hardlink,
490 ] {
491 import_snapshot_from_file("dummy.car", *import_mode)
492 .await
493 .unwrap_err();
494 }
495 }
496
497 #[tokio::test]
498 async fn import_snapshot_from_url_not_found() {
499 for import_mode in &[
500 ImportMode::Auto,
501 ImportMode::Copy,
502 ImportMode::Move,
503 ImportMode::Symlink,
504 ImportMode::Hardlink,
505 ] {
506 import_snapshot_from_file("https://forest.chainsafe.io/dummy.car", *import_mode)
507 .await
508 .unwrap_err();
509 }
510 }
511
512 async fn import_snapshot_from_file(
513 file_path: &str,
514 import_mode: ImportMode,
515 ) -> anyhow::Result<()> {
516 let temp_file = tempfile::Builder::new().tempfile()?;
518 fs::copy(Path::new(file_path), temp_file.path())?;
519 let file_path = temp_file.path();
520
521 let temp_db_dir = tempfile::Builder::new().tempdir()?;
522
523 let (path, ts) = import_chain_as_forest_car(
524 file_path,
525 temp_db_dir.path(),
526 import_mode,
527 "http://127.0.0.1:2345/rpc/v1".parse().unwrap(),
528 Path::new("test"),
529 &ChainConfig::devnet(),
530 &SnapshotProgressTracker::default(),
531 )
532 .await?;
533 match import_mode {
534 ImportMode::Symlink => {
535 assert_eq!(
536 std::path::absolute(path.read_link()?)?,
537 std::path::absolute(file_path)?
538 );
539 }
540 ImportMode::Move => {
541 assert!(!file_path.exists());
542 assert!(path.is_file());
543 }
544 _ => {
545 assert!(file_path.is_file());
546 assert!(path.is_file());
547 }
548 }
549 assert!(ts.epoch() > 0);
550 Ok(())
551 }
552}