Skip to main content

forest/daemon/
db_util.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use crate::blocks::Tipset;
5use crate::db::car::forest::{
6    FOREST_CAR_FILE_EXTENSION, TEMP_FOREST_CAR_FILE_EXTENSION, new_forest_car_temp_path_in,
7};
8use crate::db::car::{ForestCar, ManyCar};
9use crate::networks::ChainConfig;
10use crate::prelude::*;
11use crate::rpc::sync::SnapshotProgressTracker;
12use crate::shim::clock::ChainEpoch;
13use crate::state_manager::StateManager;
14use crate::utils::db::car_stream::CarStream;
15use crate::utils::io::EitherMmapOrRandomAccessFile;
16use crate::utils::net::{DownloadFileOption, download_to};
17use anyhow::{Context, bail};
18use futures::TryStreamExt;
19use serde::{Deserialize, Serialize};
20use std::{
21    ffi::OsStr,
22    fs,
23    path::{Path, PathBuf},
24    time,
25};
26use tokio::io::AsyncWriteExt;
27use tracing::{debug, info, warn};
28use url::Url;
29use walkdir::WalkDir;
30
31#[cfg(doc)]
32use crate::rpc::eth::types::EthHash;
33
34#[cfg(doc)]
35use crate::blocks::TipsetKey;
36
37#[cfg(doc)]
38use cid::Cid;
39
40/// Loads all `.forest.car.zst` snapshots and cleanup stale `.forest.car.zst.tmp` files.
41pub fn load_all_forest_cars_with_cleanup<T>(
42    store: &ManyCar<T>,
43    forest_car_db_dir: &Path,
44) -> anyhow::Result<()> {
45    load_all_forest_cars_internal(store, forest_car_db_dir, true)
46}
47
48/// Loads all `.forest.car.zst` snapshots
49pub fn load_all_forest_cars<T>(store: &ManyCar<T>, forest_car_db_dir: &Path) -> anyhow::Result<()> {
50    load_all_forest_cars_internal(store, forest_car_db_dir, false)
51}
52
53fn load_all_forest_cars_internal<T>(
54    store: &ManyCar<T>,
55    forest_car_db_dir: &Path,
56    cleanup: bool,
57) -> anyhow::Result<()> {
58    if !forest_car_db_dir.is_dir() {
59        fs::create_dir_all(forest_car_db_dir)?;
60    }
61    for file in WalkDir::new(forest_car_db_dir)
62        .max_depth(1)
63        .into_iter()
64        .filter_map(|e| {
65            e.ok().and_then(|e| {
66                if !e.file_type().is_dir() {
67                    Some(e.into_path())
68                } else {
69                    None
70                }
71            })
72        })
73    {
74        if let Some(filename) = file.file_name().and_then(OsStr::to_str) {
75            if filename.ends_with(FOREST_CAR_FILE_EXTENSION) {
76                let car = ForestCar::try_from(file.as_path())
77                    .with_context(|| format!("Error loading car DB at {}", file.display()))?;
78                store.read_only(car.into())?;
79                debug!("Loaded car DB at {}", file.display());
80            } else if cleanup && filename.ends_with(TEMP_FOREST_CAR_FILE_EXTENSION) {
81                // Only delete files that appear to be incomplete car DB files
82                match std::fs::remove_file(&file) {
83                    Ok(_) => {
84                        info!("Deleted temp car DB at {}", file.display());
85                    }
86                    Err(e) => {
87                        warn!("Failed to delete temp car DB at {}: {e}", file.display());
88                    }
89                }
90            }
91        }
92    }
93
94    tracing::info!("Loaded {} CARs", store.len());
95
96    Ok(())
97}
98
99#[derive(
100    Default,
101    PartialEq,
102    Eq,
103    Debug,
104    Clone,
105    Copy,
106    strum::Display,
107    strum::EnumString,
108    Serialize,
109    Deserialize,
110)]
111#[strum(serialize_all = "lowercase")]
112#[cfg_attr(test, derive(derive_quickcheck_arbitrary::Arbitrary))]
113pub enum ImportMode {
114    #[default]
115    /// Hard link the snapshot and fallback to `Copy` if not applicable
116    Auto,
117    /// Copies the snapshot to the database directory.
118    Copy,
119    /// Moves the snapshot to the database directory (or copies and deletes the original).
120    Move,
121    /// Creates a symbolic link to the snapshot in the database directory.
122    Symlink,
123    /// Creates a symbolic link to the snapshot in the database directory.
124    Hardlink,
125}
126
127/// This function validates and stores the CAR binary from `from_path`(either local path or URL) into the `{DB_ROOT}/car_db/`
128/// (automatically trans-code into `.forest.car.zst` format when needed), and returns its final file path and the heaviest tipset.
129pub async fn import_chain_as_forest_car(
130    from_path: &Path,
131    forest_car_db_dir: &Path,
132    import_mode: ImportMode,
133    rpc_endpoint: Url,
134    f3_root: &Path,
135    chain_config: &ChainConfig,
136    snapshot_progress_tracker: &SnapshotProgressTracker,
137) -> anyhow::Result<(PathBuf, Tipset)> {
138    info!("Importing chain from snapshot at: {}", from_path.display());
139
140    let stopwatch = time::Instant::now();
141
142    let forest_car_db_path = forest_car_db_dir.join(format!(
143        "{}{FOREST_CAR_FILE_EXTENSION}",
144        chrono::Utc::now().timestamp_millis()
145    ));
146
147    let move_or_copy = |mode: ImportMode| {
148        let forest_car_db_path = forest_car_db_path.clone();
149        async move {
150            let downloaded_car_temp_path = new_forest_car_temp_path_in(forest_car_db_dir)?;
151            if let Ok(url) = Url::parse(&from_path.display().to_string()) {
152                download_to(
153                    &url,
154                    &downloaded_car_temp_path,
155                    DownloadFileOption::Resumable,
156                    snapshot_progress_tracker.create_callback(),
157                )
158                .await?;
159
160                snapshot_progress_tracker.completed();
161            } else {
162                snapshot_progress_tracker.not_required();
163                if ForestCar::is_valid(&EitherMmapOrRandomAccessFile::open(from_path)?) {
164                    move_or_copy_file(from_path, &downloaded_car_temp_path, mode)?;
165                } else {
166                    // For a local snapshot, we transcode directly instead of copying & transcoding.
167                    transcode_into_forest_car(from_path, &downloaded_car_temp_path).await?;
168                    if mode == ImportMode::Move {
169                        std::fs::remove_file(from_path).context("Error removing original file")?;
170                    }
171                }
172            }
173
174            if ForestCar::is_valid(&EitherMmapOrRandomAccessFile::open(
175                &downloaded_car_temp_path,
176            )?) {
177                downloaded_car_temp_path.persist(&forest_car_db_path)?;
178            } else {
179                // Use another temp file to make sure all final `.forest.car.zst` files are complete and valid.
180                let forest_car_db_temp_path = new_forest_car_temp_path_in(forest_car_db_dir)?;
181                transcode_into_forest_car(&downloaded_car_temp_path, &forest_car_db_temp_path)
182                    .await?;
183                forest_car_db_temp_path.persist(&forest_car_db_path)?;
184            }
185            anyhow::Ok(())
186        }
187    };
188
189    match import_mode {
190        ImportMode::Auto => {
191            if Url::parse(&from_path.display().to_string()).is_ok() {
192                // Fallback to move if from_path is url
193                move_or_copy(ImportMode::Move).await?;
194            } else if ForestCar::is_valid(&EitherMmapOrRandomAccessFile::open(from_path)?) {
195                tracing::info!(
196                    "Hardlinking {} to {}",
197                    from_path.display(),
198                    forest_car_db_path.display()
199                );
200                if std::fs::hard_link(from_path, &forest_car_db_path).is_err() {
201                    tracing::warn!("Error creating hardlink, fallback to copy");
202                    move_or_copy(ImportMode::Copy).await?;
203                }
204            } else {
205                tracing::warn!(
206                    "Snapshot file is not a valid forest.car.zst file, fallback to copy"
207                );
208                move_or_copy(ImportMode::Copy).await?;
209            }
210        }
211        ImportMode::Copy | ImportMode::Move => {
212            move_or_copy(import_mode).await?;
213        }
214        ImportMode::Symlink => {
215            let from_path = std::path::absolute(from_path)?;
216            if ForestCar::is_valid(&EitherMmapOrRandomAccessFile::open(&from_path)?) {
217                tracing::info!(
218                    "Symlinking {} to {}",
219                    from_path.display(),
220                    forest_car_db_path.display()
221                );
222                std::os::unix::fs::symlink(from_path, &forest_car_db_path)
223                    .context("Error creating symlink")?;
224            } else {
225                bail!("Snapshot file must be a valid forest.car.zst file");
226            }
227        }
228        ImportMode::Hardlink => {
229            if ForestCar::is_valid(&EitherMmapOrRandomAccessFile::open(from_path)?) {
230                tracing::info!(
231                    "Hardlinking {} to {}",
232                    from_path.display(),
233                    forest_car_db_path.display()
234                );
235                std::fs::hard_link(from_path, &forest_car_db_path)
236                    .context("Error creating hardlink")?;
237            } else {
238                bail!("Snapshot file must be a valid forest.car.zst file");
239            }
240        }
241    };
242
243    let forest_car = ForestCar::try_from(forest_car_db_path.as_path())?;
244
245    if let Some(f3_cid) = forest_car.metadata().and_then(|m| m.f3_data) {
246        if crate::f3::get_f3_sidecar_params(chain_config)
247            .initial_power_table
248            .is_none()
249        {
250            // To avoid importing old/wrong F3 data without initial power table check
251            tracing::warn!(
252                "skipped importing F3 data as the initial power table CID is not set in the current manifest"
253            );
254        } else {
255            let mut f3_data = forest_car
256                .get_reader(f3_cid)?
257                .with_context(|| format!("f3 data not found, cid: {f3_cid}"))?;
258            let mut temp_f3_snap = tempfile::Builder::new()
259                .suffix(".f3snap.bin")
260                .tempfile_in(forest_car_db_dir)?;
261            {
262                let f = temp_f3_snap.as_file_mut();
263                std::io::copy(&mut f3_data, f)?;
264                f.sync_all()?;
265            }
266            if let Err(e) = crate::f3::import_f3_snapshot(
267                chain_config,
268                rpc_endpoint.to_string(),
269                f3_root.display().to_string(),
270                temp_f3_snap.path().display().to_string(),
271            ) {
272                // Do not make it a hard error if anything is wrong with F3 snapshot
273                tracing::error!("Failed to import F3 snapshot: {e:#}");
274            }
275        }
276    }
277
278    let ts = forest_car.heaviest_tipset()?;
279    info!(
280        "Imported snapshot in: {}s, heaviest tipset epoch: {}, key: {}",
281        stopwatch.elapsed().as_secs(),
282        ts.epoch(),
283        ts.key()
284    );
285
286    Ok((forest_car_db_path, ts))
287}
288
289fn move_or_copy_file(from: &Path, to: &Path, import_mode: ImportMode) -> anyhow::Result<()> {
290    match import_mode {
291        ImportMode::Move => {
292            tracing::info!("Moving {} to {}", from.display(), to.display());
293            if fs::rename(from, to).is_ok() {
294                Ok(())
295            } else {
296                fs::copy(from, to).context("Error copying file")?;
297                fs::remove_file(from).context("Error removing original file")?;
298                Ok(())
299            }
300        }
301        ImportMode::Copy => {
302            tracing::info!("Copying {} to {}", from.display(), to.display());
303            fs::copy(from, to).map(|_| ()).context("Error copying file")
304        }
305        m => {
306            bail!("{m} must be handled elsewhere");
307        }
308    }
309}
310
311async fn transcode_into_forest_car(from: &Path, to: &Path) -> anyhow::Result<()> {
312    tracing::info!(
313        from = %from.display(),
314        to = %to.display(),
315        "transcoding into forest car"
316    );
317    let car_stream = CarStream::new_from_path(from).await?;
318    let roots = car_stream.header_v1.roots.clone();
319
320    let mut writer = tokio::io::BufWriter::new(tokio::fs::File::create(to).await?);
321    let frames = crate::db::car::forest::Encoder::compress_stream_default(
322        car_stream.map_err(anyhow::Error::from),
323    );
324    crate::db::car::forest::Encoder::write(&mut writer, roots, frames).await?;
325    writer.shutdown().await?;
326
327    Ok(())
328}
329
330async fn process_ts(
331    ts: &Tipset,
332    state_manager: &StateManager,
333    delegated_messages: &mut Vec<(crate::message::SignedMessage, u64)>,
334) -> anyhow::Result<()> {
335    let epoch = ts.epoch();
336    let tsk = ts.key().clone();
337
338    state_manager.load_executed_tipset(ts).await?;
339
340    delegated_messages.append(
341        &mut state_manager
342            .chain_store()
343            .headers_delegated_messages(ts.block_headers().iter())?,
344    );
345    tracing::trace!("Indexing tipset @{}: {}", epoch, &tsk);
346    tsk.save(state_manager.db())?;
347
348    Ok(())
349}
350
351pub enum RangeSpec {
352    To(ChainEpoch),
353    NumTipsets(usize),
354}
355
356impl std::fmt::Display for RangeSpec {
357    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
358        match self {
359            RangeSpec::To(epoch) => write!(f, "To epoch:      {}", epoch),
360            RangeSpec::NumTipsets(n) => write!(f, "Tipsets:       {}", n),
361        }
362    }
363}
364
365/// To support the Event RPC API, a new column has been added to parity-db to handle the mapping:
366/// - Events root [`Cid`] -> [`TipsetKey`].
367///
368/// Similarly, to support the Ethereum RPC API, another column has been introduced to map:
369/// - [`struct@EthHash`] -> [`TipsetKey`],
370/// - [`struct@EthHash`] -> Delegated message [`Cid`].
371///
372/// This function traverses the chain store and populates these columns accordingly.
373pub async fn backfill_db(
374    state_manager: &StateManager,
375    head_ts: &Tipset,
376    spec: RangeSpec,
377) -> anyhow::Result<()> {
378    tracing::info!("Starting index backfill...");
379
380    let mut delegated_messages = vec![];
381
382    let mut num_backfills = 0;
383
384    match spec {
385        RangeSpec::To(to_epoch) => {
386            for ts in head_ts
387                .shallow_clone()
388                .chain(&state_manager.chain_store().db())
389                .take_while(|ts| ts.epoch() >= to_epoch)
390            {
391                process_ts(&ts, state_manager, &mut delegated_messages).await?;
392                num_backfills += 1;
393            }
394        }
395        RangeSpec::NumTipsets(n_tipsets) => {
396            for ts in head_ts
397                .shallow_clone()
398                .chain(&state_manager.chain_store().db())
399                .take(n_tipsets)
400            {
401                process_ts(&ts, state_manager, &mut delegated_messages).await?;
402                num_backfills += 1;
403            }
404        }
405    }
406
407    state_manager
408        .chain_store()
409        .process_signed_messages(&delegated_messages)?;
410
411    tracing::info!("Total successful backfills: {}", num_backfills);
412
413    Ok(())
414}
415
416#[cfg(test)]
417mod test {
418    use super::*;
419
420    #[tokio::test]
421    async fn import_snapshot_from_file_valid() {
422        for import_mode in [ImportMode::Auto, ImportMode::Copy, ImportMode::Move] {
423            import_snapshot_from_file("test-snapshots/chain4.car", import_mode)
424                .await
425                .unwrap();
426        }
427
428        // Linking is not supported for raw CAR files.
429        for import_mode in [ImportMode::Symlink, ImportMode::Hardlink] {
430            import_snapshot_from_file("test-snapshots/chain4.car", import_mode)
431                .await
432                .unwrap_err();
433        }
434    }
435
436    #[tokio::test]
437    async fn import_snapshot_from_compressed_file_valid() {
438        for import_mode in [ImportMode::Auto, ImportMode::Copy, ImportMode::Move] {
439            import_snapshot_from_file("test-snapshots/chain4.car.zst", import_mode)
440                .await
441                .unwrap();
442        }
443
444        // Linking is not supported for raw CAR files.
445        for import_mode in [ImportMode::Symlink, ImportMode::Hardlink] {
446            import_snapshot_from_file("test-snapshots/chain4.car", import_mode)
447                .await
448                .unwrap_err();
449        }
450    }
451
452    #[tokio::test]
453    async fn import_snapshot_from_forest_car_valid() {
454        for import_mode in [
455            ImportMode::Auto,
456            ImportMode::Copy,
457            ImportMode::Move,
458            ImportMode::Symlink,
459            ImportMode::Hardlink,
460        ] {
461            import_snapshot_from_file("test-snapshots/chain4.forest.car.zst", import_mode)
462                .await
463                .unwrap();
464        }
465    }
466
467    #[tokio::test]
468    async fn import_snapshot_from_file_invalid() {
469        for import_mode in &[
470            ImportMode::Auto,
471            ImportMode::Copy,
472            ImportMode::Move,
473            ImportMode::Symlink,
474            ImportMode::Hardlink,
475        ] {
476            import_snapshot_from_file("Cargo.toml", *import_mode)
477                .await
478                .unwrap_err();
479        }
480    }
481
482    #[tokio::test]
483    async fn import_snapshot_from_file_not_found() {
484        for import_mode in &[
485            ImportMode::Auto,
486            ImportMode::Copy,
487            ImportMode::Move,
488            ImportMode::Symlink,
489            ImportMode::Hardlink,
490        ] {
491            import_snapshot_from_file("dummy.car", *import_mode)
492                .await
493                .unwrap_err();
494        }
495    }
496
497    #[tokio::test]
498    async fn import_snapshot_from_url_not_found() {
499        for import_mode in &[
500            ImportMode::Auto,
501            ImportMode::Copy,
502            ImportMode::Move,
503            ImportMode::Symlink,
504            ImportMode::Hardlink,
505        ] {
506            import_snapshot_from_file("https://forest.chainsafe.io/dummy.car", *import_mode)
507                .await
508                .unwrap_err();
509        }
510    }
511
512    async fn import_snapshot_from_file(
513        file_path: &str,
514        import_mode: ImportMode,
515    ) -> anyhow::Result<()> {
516        // Prevent modifications on the original file, e.g., deletion via `ImportMode::Move`.
517        let temp_file = tempfile::Builder::new().tempfile()?;
518        fs::copy(Path::new(file_path), temp_file.path())?;
519        let file_path = temp_file.path();
520
521        let temp_db_dir = tempfile::Builder::new().tempdir()?;
522
523        let (path, ts) = import_chain_as_forest_car(
524            file_path,
525            temp_db_dir.path(),
526            import_mode,
527            "http://127.0.0.1:2345/rpc/v1".parse().unwrap(),
528            Path::new("test"),
529            &ChainConfig::devnet(),
530            &SnapshotProgressTracker::default(),
531        )
532        .await?;
533        match import_mode {
534            ImportMode::Symlink => {
535                assert_eq!(
536                    std::path::absolute(path.read_link()?)?,
537                    std::path::absolute(file_path)?
538                );
539            }
540            ImportMode::Move => {
541                assert!(!file_path.exists());
542                assert!(path.is_file());
543            }
544            _ => {
545                assert!(file_path.is_file());
546                assert!(path.is_file());
547            }
548        }
549        assert!(ts.epoch() > 0);
550        Ok(())
551    }
552}