1use crate::blocks::Tipset;
5use crate::db::car::forest::{
6 FOREST_CAR_FILE_EXTENSION, TEMP_FOREST_CAR_FILE_EXTENSION, new_forest_car_temp_path_in,
7};
8use crate::db::car::{ForestCar, ManyCar};
9use crate::db::{Blockstore, EthMappingsStore};
10use crate::networks::ChainConfig;
11use crate::rpc::sync::SnapshotProgressTracker;
12use crate::shim::clock::ChainEpoch;
13use crate::state_manager::StateManager;
14use crate::utils::ShallowClone as _;
15use crate::utils::db::car_stream::CarStream;
16use crate::utils::io::EitherMmapOrRandomAccessFile;
17use crate::utils::net::{DownloadFileOption, download_to};
18use anyhow::{Context, bail};
19use futures::TryStreamExt;
20use serde::{Deserialize, Serialize};
21use std::{
22 ffi::OsStr,
23 fs,
24 path::{Path, PathBuf},
25 sync::Arc,
26 time,
27};
28use tokio::io::AsyncWriteExt;
29use tracing::{debug, info, warn};
30use url::Url;
31use walkdir::WalkDir;
32
33#[cfg(doc)]
34use crate::rpc::eth::types::EthHash;
35
36#[cfg(doc)]
37use crate::blocks::TipsetKey;
38
39#[cfg(doc)]
40use cid::Cid;
41
42pub fn load_all_forest_cars_with_cleanup<T>(
44 store: &ManyCar<T>,
45 forest_car_db_dir: &Path,
46) -> anyhow::Result<()> {
47 load_all_forest_cars_internal(store, forest_car_db_dir, true)
48}
49
50pub fn load_all_forest_cars<T>(store: &ManyCar<T>, forest_car_db_dir: &Path) -> anyhow::Result<()> {
52 load_all_forest_cars_internal(store, forest_car_db_dir, false)
53}
54
55fn load_all_forest_cars_internal<T>(
56 store: &ManyCar<T>,
57 forest_car_db_dir: &Path,
58 cleanup: bool,
59) -> anyhow::Result<()> {
60 if !forest_car_db_dir.is_dir() {
61 fs::create_dir_all(forest_car_db_dir)?;
62 }
63 for file in WalkDir::new(forest_car_db_dir)
64 .max_depth(1)
65 .into_iter()
66 .filter_map(|e| {
67 e.ok().and_then(|e| {
68 if !e.file_type().is_dir() {
69 Some(e.into_path())
70 } else {
71 None
72 }
73 })
74 })
75 {
76 if let Some(filename) = file.file_name().and_then(OsStr::to_str) {
77 if filename.ends_with(FOREST_CAR_FILE_EXTENSION) {
78 let car = ForestCar::try_from(file.as_path())
79 .with_context(|| format!("Error loading car DB at {}", file.display()))?;
80 store.read_only(car.into())?;
81 debug!("Loaded car DB at {}", file.display());
82 } else if cleanup && filename.ends_with(TEMP_FOREST_CAR_FILE_EXTENSION) {
83 match std::fs::remove_file(&file) {
85 Ok(_) => {
86 info!("Deleted temp car DB at {}", file.display());
87 }
88 Err(e) => {
89 warn!("Failed to delete temp car DB at {}: {e}", file.display());
90 }
91 }
92 }
93 }
94 }
95
96 tracing::info!("Loaded {} CARs", store.len());
97
98 Ok(())
99}
100
101#[derive(
102 Default,
103 PartialEq,
104 Eq,
105 Debug,
106 Clone,
107 Copy,
108 strum::Display,
109 strum::EnumString,
110 Serialize,
111 Deserialize,
112)]
113#[strum(serialize_all = "lowercase")]
114#[cfg_attr(test, derive(derive_quickcheck_arbitrary::Arbitrary))]
115pub enum ImportMode {
116 #[default]
117 Auto,
119 Copy,
121 Move,
123 Symlink,
125 Hardlink,
127}
128
129pub async fn import_chain_as_forest_car(
132 from_path: &Path,
133 forest_car_db_dir: &Path,
134 import_mode: ImportMode,
135 rpc_endpoint: Url,
136 f3_root: &Path,
137 chain_config: &ChainConfig,
138 snapshot_progress_tracker: &SnapshotProgressTracker,
139) -> anyhow::Result<(PathBuf, Tipset)> {
140 info!("Importing chain from snapshot at: {}", from_path.display());
141
142 let stopwatch = time::Instant::now();
143
144 let forest_car_db_path = forest_car_db_dir.join(format!(
145 "{}{FOREST_CAR_FILE_EXTENSION}",
146 chrono::Utc::now().timestamp_millis()
147 ));
148
149 let move_or_copy = |mode: ImportMode| {
150 let forest_car_db_path = forest_car_db_path.clone();
151 async move {
152 let downloaded_car_temp_path = new_forest_car_temp_path_in(forest_car_db_dir)?;
153 if let Ok(url) = Url::parse(&from_path.display().to_string()) {
154 download_to(
155 &url,
156 &downloaded_car_temp_path,
157 DownloadFileOption::Resumable,
158 snapshot_progress_tracker.create_callback(),
159 )
160 .await?;
161
162 snapshot_progress_tracker.completed();
163 } else {
164 snapshot_progress_tracker.not_required();
165 if ForestCar::is_valid(&EitherMmapOrRandomAccessFile::open(from_path)?) {
166 move_or_copy_file(from_path, &downloaded_car_temp_path, mode)?;
167 } else {
168 transcode_into_forest_car(from_path, &downloaded_car_temp_path).await?;
170 if mode == ImportMode::Move {
171 std::fs::remove_file(from_path).context("Error removing original file")?;
172 }
173 }
174 }
175
176 if ForestCar::is_valid(&EitherMmapOrRandomAccessFile::open(
177 &downloaded_car_temp_path,
178 )?) {
179 downloaded_car_temp_path.persist(&forest_car_db_path)?;
180 } else {
181 let forest_car_db_temp_path = new_forest_car_temp_path_in(forest_car_db_dir)?;
183 transcode_into_forest_car(&downloaded_car_temp_path, &forest_car_db_temp_path)
184 .await?;
185 forest_car_db_temp_path.persist(&forest_car_db_path)?;
186 }
187 anyhow::Ok(())
188 }
189 };
190
191 match import_mode {
192 ImportMode::Auto => {
193 if Url::parse(&from_path.display().to_string()).is_ok() {
194 move_or_copy(ImportMode::Move).await?;
196 } else if ForestCar::is_valid(&EitherMmapOrRandomAccessFile::open(from_path)?) {
197 tracing::info!(
198 "Hardlinking {} to {}",
199 from_path.display(),
200 forest_car_db_path.display()
201 );
202 if std::fs::hard_link(from_path, &forest_car_db_path).is_err() {
203 tracing::warn!("Error creating hardlink, fallback to copy");
204 move_or_copy(ImportMode::Copy).await?;
205 }
206 } else {
207 tracing::warn!(
208 "Snapshot file is not a valid forest.car.zst file, fallback to copy"
209 );
210 move_or_copy(ImportMode::Copy).await?;
211 }
212 }
213 ImportMode::Copy | ImportMode::Move => {
214 move_or_copy(import_mode).await?;
215 }
216 ImportMode::Symlink => {
217 let from_path = std::path::absolute(from_path)?;
218 if ForestCar::is_valid(&EitherMmapOrRandomAccessFile::open(&from_path)?) {
219 tracing::info!(
220 "Symlinking {} to {}",
221 from_path.display(),
222 forest_car_db_path.display()
223 );
224 std::os::unix::fs::symlink(from_path, &forest_car_db_path)
225 .context("Error creating symlink")?;
226 } else {
227 bail!("Snapshot file must be a valid forest.car.zst file");
228 }
229 }
230 ImportMode::Hardlink => {
231 if ForestCar::is_valid(&EitherMmapOrRandomAccessFile::open(from_path)?) {
232 tracing::info!(
233 "Hardlinking {} to {}",
234 from_path.display(),
235 forest_car_db_path.display()
236 );
237 std::fs::hard_link(from_path, &forest_car_db_path)
238 .context("Error creating hardlink")?;
239 } else {
240 bail!("Snapshot file must be a valid forest.car.zst file");
241 }
242 }
243 };
244
245 let forest_car = ForestCar::try_from(forest_car_db_path.as_path())?;
246
247 if let Some(f3_cid) = forest_car.metadata().and_then(|m| m.f3_data) {
248 if crate::f3::get_f3_sidecar_params(chain_config)
249 .initial_power_table
250 .is_none()
251 {
252 tracing::warn!(
254 "skipped importing F3 data as the initial power table CID is not set in the current manifest"
255 );
256 } else {
257 let mut f3_data = forest_car
258 .get_reader(f3_cid)?
259 .with_context(|| format!("f3 data not found, cid: {f3_cid}"))?;
260 let mut temp_f3_snap = tempfile::Builder::new()
261 .suffix(".f3snap.bin")
262 .tempfile_in(forest_car_db_dir)?;
263 {
264 let f = temp_f3_snap.as_file_mut();
265 std::io::copy(&mut f3_data, f)?;
266 f.sync_all()?;
267 }
268 if let Err(e) = crate::f3::import_f3_snapshot(
269 chain_config,
270 rpc_endpoint.to_string(),
271 f3_root.display().to_string(),
272 temp_f3_snap.path().display().to_string(),
273 ) {
274 tracing::error!("Failed to import F3 snapshot: {e:#}");
276 }
277 }
278 }
279
280 let ts = forest_car.heaviest_tipset()?;
281 info!(
282 "Imported snapshot in: {}s, heaviest tipset epoch: {}, key: {}",
283 stopwatch.elapsed().as_secs(),
284 ts.epoch(),
285 ts.key()
286 );
287
288 Ok((forest_car_db_path, ts))
289}
290
291fn move_or_copy_file(from: &Path, to: &Path, import_mode: ImportMode) -> anyhow::Result<()> {
292 match import_mode {
293 ImportMode::Move => {
294 tracing::info!("Moving {} to {}", from.display(), to.display());
295 if fs::rename(from, to).is_ok() {
296 Ok(())
297 } else {
298 fs::copy(from, to).context("Error copying file")?;
299 fs::remove_file(from).context("Error removing original file")?;
300 Ok(())
301 }
302 }
303 ImportMode::Copy => {
304 tracing::info!("Copying {} to {}", from.display(), to.display());
305 fs::copy(from, to).map(|_| ()).context("Error copying file")
306 }
307 m => {
308 bail!("{m} must be handled elsewhere");
309 }
310 }
311}
312
313async fn transcode_into_forest_car(from: &Path, to: &Path) -> anyhow::Result<()> {
314 tracing::info!(
315 from = %from.display(),
316 to = %to.display(),
317 "transcoding into forest car"
318 );
319 let car_stream = CarStream::new_from_path(from).await?;
320 let roots = car_stream.header_v1.roots.clone();
321
322 let mut writer = tokio::io::BufWriter::new(tokio::fs::File::create(to).await?);
323 let frames = crate::db::car::forest::Encoder::compress_stream_default(
324 car_stream.map_err(anyhow::Error::from),
325 );
326 crate::db::car::forest::Encoder::write(&mut writer, roots, frames).await?;
327 writer.shutdown().await?;
328
329 Ok(())
330}
331
332async fn process_ts<DB>(
333 ts: &Tipset,
334 state_manager: &Arc<StateManager<DB>>,
335 delegated_messages: &mut Vec<(crate::message::SignedMessage, u64)>,
336) -> anyhow::Result<()>
337where
338 DB: Blockstore + EthMappingsStore + Send + Sync + 'static,
339{
340 let epoch = ts.epoch();
341 let tsk = ts.key().clone();
342
343 state_manager.load_executed_tipset(ts).await?;
344
345 delegated_messages.append(
346 &mut state_manager
347 .chain_store()
348 .headers_delegated_messages(ts.block_headers().iter())?,
349 );
350 tracing::trace!("Indexing tipset @{}: {}", epoch, &tsk);
351 tsk.save(state_manager.blockstore())?;
352
353 Ok(())
354}
355
356pub enum RangeSpec {
357 To(ChainEpoch),
358 NumTipsets(usize),
359}
360
361impl std::fmt::Display for RangeSpec {
362 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
363 match self {
364 RangeSpec::To(epoch) => write!(f, "To epoch: {}", epoch),
365 RangeSpec::NumTipsets(n) => write!(f, "Tipsets: {}", n),
366 }
367 }
368}
369
370pub async fn backfill_db<DB>(
379 state_manager: &Arc<StateManager<DB>>,
380 head_ts: &Tipset,
381 spec: RangeSpec,
382) -> anyhow::Result<()>
383where
384 DB: Blockstore + EthMappingsStore + Send + Sync + 'static,
385{
386 tracing::info!("Starting index backfill...");
387
388 let mut delegated_messages = vec![];
389
390 let mut num_backfills = 0;
391
392 match spec {
393 RangeSpec::To(to_epoch) => {
394 for ts in head_ts
395 .shallow_clone()
396 .chain(&state_manager.chain_store().blockstore())
397 .take_while(|ts| ts.epoch() >= to_epoch)
398 {
399 process_ts(&ts, state_manager, &mut delegated_messages).await?;
400 num_backfills += 1;
401 }
402 }
403 RangeSpec::NumTipsets(n_tipsets) => {
404 for ts in head_ts
405 .shallow_clone()
406 .chain(&state_manager.chain_store().blockstore())
407 .take(n_tipsets)
408 {
409 process_ts(&ts, state_manager, &mut delegated_messages).await?;
410 num_backfills += 1;
411 }
412 }
413 }
414
415 state_manager
416 .chain_store()
417 .process_signed_messages(&delegated_messages)?;
418
419 tracing::info!("Total successful backfills: {}", num_backfills);
420
421 Ok(())
422}
423
424#[cfg(test)]
425mod test {
426 use super::*;
427
428 #[tokio::test]
429 async fn import_snapshot_from_file_valid() {
430 for import_mode in [ImportMode::Auto, ImportMode::Copy, ImportMode::Move] {
431 import_snapshot_from_file("test-snapshots/chain4.car", import_mode)
432 .await
433 .unwrap();
434 }
435
436 for import_mode in [ImportMode::Symlink, ImportMode::Hardlink] {
438 import_snapshot_from_file("test-snapshots/chain4.car", import_mode)
439 .await
440 .unwrap_err();
441 }
442 }
443
444 #[tokio::test]
445 async fn import_snapshot_from_compressed_file_valid() {
446 for import_mode in [ImportMode::Auto, ImportMode::Copy, ImportMode::Move] {
447 import_snapshot_from_file("test-snapshots/chain4.car.zst", import_mode)
448 .await
449 .unwrap();
450 }
451
452 for import_mode in [ImportMode::Symlink, ImportMode::Hardlink] {
454 import_snapshot_from_file("test-snapshots/chain4.car", import_mode)
455 .await
456 .unwrap_err();
457 }
458 }
459
460 #[tokio::test]
461 async fn import_snapshot_from_forest_car_valid() {
462 for import_mode in [
463 ImportMode::Auto,
464 ImportMode::Copy,
465 ImportMode::Move,
466 ImportMode::Symlink,
467 ImportMode::Hardlink,
468 ] {
469 import_snapshot_from_file("test-snapshots/chain4.forest.car.zst", import_mode)
470 .await
471 .unwrap();
472 }
473 }
474
475 #[tokio::test]
476 async fn import_snapshot_from_file_invalid() {
477 for import_mode in &[
478 ImportMode::Auto,
479 ImportMode::Copy,
480 ImportMode::Move,
481 ImportMode::Symlink,
482 ImportMode::Hardlink,
483 ] {
484 import_snapshot_from_file("Cargo.toml", *import_mode)
485 .await
486 .unwrap_err();
487 }
488 }
489
490 #[tokio::test]
491 async fn import_snapshot_from_file_not_found() {
492 for import_mode in &[
493 ImportMode::Auto,
494 ImportMode::Copy,
495 ImportMode::Move,
496 ImportMode::Symlink,
497 ImportMode::Hardlink,
498 ] {
499 import_snapshot_from_file("dummy.car", *import_mode)
500 .await
501 .unwrap_err();
502 }
503 }
504
505 #[tokio::test]
506 async fn import_snapshot_from_url_not_found() {
507 for import_mode in &[
508 ImportMode::Auto,
509 ImportMode::Copy,
510 ImportMode::Move,
511 ImportMode::Symlink,
512 ImportMode::Hardlink,
513 ] {
514 import_snapshot_from_file("https://forest.chainsafe.io/dummy.car", *import_mode)
515 .await
516 .unwrap_err();
517 }
518 }
519
520 async fn import_snapshot_from_file(
521 file_path: &str,
522 import_mode: ImportMode,
523 ) -> anyhow::Result<()> {
524 let temp_file = tempfile::Builder::new().tempfile()?;
526 fs::copy(Path::new(file_path), temp_file.path())?;
527 let file_path = temp_file.path();
528
529 let temp_db_dir = tempfile::Builder::new().tempdir()?;
530
531 let (path, ts) = import_chain_as_forest_car(
532 file_path,
533 temp_db_dir.path(),
534 import_mode,
535 "http://127.0.0.1:2345/rpc/v1".parse().unwrap(),
536 Path::new("test"),
537 &ChainConfig::devnet(),
538 &SnapshotProgressTracker::default(),
539 )
540 .await?;
541 match import_mode {
542 ImportMode::Symlink => {
543 assert_eq!(
544 std::path::absolute(path.read_link()?)?,
545 std::path::absolute(file_path)?
546 );
547 }
548 ImportMode::Move => {
549 assert!(!file_path.exists());
550 assert!(path.is_file());
551 }
552 _ => {
553 assert!(file_path.is_file());
554 assert!(path.is_file());
555 }
556 }
557 assert!(ts.epoch() > 0);
558 Ok(())
559 }
560}