forest/daemon/
db_util.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use crate::blocks::Tipset;
5use crate::db::car::forest::{
6    FOREST_CAR_FILE_EXTENSION, TEMP_FOREST_CAR_FILE_EXTENSION, new_forest_car_temp_path_in,
7};
8use crate::db::car::{ForestCar, ManyCar};
9use crate::interpreter::VMTrace;
10use crate::networks::ChainConfig;
11use crate::rpc::sync::SnapshotProgressTracker;
12use crate::shim::clock::ChainEpoch;
13use crate::state_manager::{NO_CALLBACK, StateManager};
14use crate::utils::db::car_stream::CarStream;
15use crate::utils::io::EitherMmapOrRandomAccessFile;
16use crate::utils::net::{DownloadFileOption, download_to};
17use anyhow::{Context, bail};
18use futures::TryStreamExt;
19use serde::{Deserialize, Serialize};
20use std::{
21    ffi::OsStr,
22    fs,
23    path::{Path, PathBuf},
24    sync::Arc,
25    time,
26};
27use tokio::io::AsyncWriteExt;
28use tracing::{debug, info, warn};
29use url::Url;
30use walkdir::WalkDir;
31
32#[cfg(doc)]
33use crate::rpc::eth::types::EthHash;
34
35#[cfg(doc)]
36use crate::blocks::TipsetKey;
37
38#[cfg(doc)]
39use cid::Cid;
40
41/// Loads all `.forest.car.zst` snapshots and cleanup stale `.forest.car.zst.tmp` files.
42pub fn load_all_forest_cars_with_cleanup<T>(
43    store: &ManyCar<T>,
44    forest_car_db_dir: &Path,
45) -> anyhow::Result<()> {
46    load_all_forest_cars_internal(store, forest_car_db_dir, true)
47}
48
49/// Loads all `.forest.car.zst` snapshots
50pub fn load_all_forest_cars<T>(store: &ManyCar<T>, forest_car_db_dir: &Path) -> anyhow::Result<()> {
51    load_all_forest_cars_internal(store, forest_car_db_dir, false)
52}
53
54fn load_all_forest_cars_internal<T>(
55    store: &ManyCar<T>,
56    forest_car_db_dir: &Path,
57    cleanup: bool,
58) -> anyhow::Result<()> {
59    if !forest_car_db_dir.is_dir() {
60        fs::create_dir_all(forest_car_db_dir)?;
61    }
62    for file in WalkDir::new(forest_car_db_dir)
63        .max_depth(1)
64        .into_iter()
65        .filter_map(|e| {
66            e.ok().and_then(|e| {
67                if !e.file_type().is_dir() {
68                    Some(e.into_path())
69                } else {
70                    None
71                }
72            })
73        })
74    {
75        if let Some(filename) = file.file_name().and_then(OsStr::to_str) {
76            if filename.ends_with(FOREST_CAR_FILE_EXTENSION) {
77                let car = ForestCar::try_from(file.as_path())
78                    .with_context(|| format!("Error loading car DB at {}", file.display()))?;
79                store.read_only(car.into())?;
80                debug!("Loaded car DB at {}", file.display());
81            } else if cleanup && filename.ends_with(TEMP_FOREST_CAR_FILE_EXTENSION) {
82                // Only delete files that appear to be incomplete car DB files
83                match std::fs::remove_file(&file) {
84                    Ok(_) => {
85                        info!("Deleted temp car DB at {}", file.display());
86                    }
87                    Err(e) => {
88                        warn!("Failed to delete temp car DB at {}: {e}", file.display());
89                    }
90                }
91            }
92        }
93    }
94
95    tracing::info!("Loaded {} CARs", store.len());
96
97    Ok(())
98}
99
100#[derive(
101    Default,
102    PartialEq,
103    Eq,
104    Debug,
105    Clone,
106    Copy,
107    strum::Display,
108    strum::EnumString,
109    Serialize,
110    Deserialize,
111)]
112#[strum(serialize_all = "lowercase")]
113#[cfg_attr(test, derive(derive_quickcheck_arbitrary::Arbitrary))]
114pub enum ImportMode {
115    #[default]
116    /// Hard link the snapshot and fallback to `Copy` if not applicable
117    Auto,
118    /// Copies the snapshot to the database directory.
119    Copy,
120    /// Moves the snapshot to the database directory (or copies and deletes the original).
121    Move,
122    /// Creates a symbolic link to the snapshot in the database directory.
123    Symlink,
124    /// Creates a symbolic link to the snapshot in the database directory.
125    Hardlink,
126}
127
128/// This function validates and stores the CAR binary from `from_path`(either local path or URL) into the `{DB_ROOT}/car_db/`
129/// (automatically trans-code into `.forest.car.zst` format when needed), and returns its final file path and the heaviest tipset.
130pub async fn import_chain_as_forest_car(
131    from_path: &Path,
132    forest_car_db_dir: &Path,
133    import_mode: ImportMode,
134    rpc_endpoint: Url,
135    f3_root: &Path,
136    chain_config: &ChainConfig,
137    snapshot_progress_tracker: &SnapshotProgressTracker,
138) -> anyhow::Result<(PathBuf, Tipset)> {
139    info!("Importing chain from snapshot at: {}", from_path.display());
140
141    let stopwatch = time::Instant::now();
142
143    let forest_car_db_path = forest_car_db_dir.join(format!(
144        "{}{FOREST_CAR_FILE_EXTENSION}",
145        chrono::Utc::now().timestamp_millis()
146    ));
147
148    let move_or_copy = |mode: ImportMode| {
149        let forest_car_db_path = forest_car_db_path.clone();
150        async move {
151            let downloaded_car_temp_path = new_forest_car_temp_path_in(forest_car_db_dir)?;
152            if let Ok(url) = Url::parse(&from_path.display().to_string()) {
153                download_to(
154                    &url,
155                    &downloaded_car_temp_path,
156                    DownloadFileOption::Resumable,
157                    snapshot_progress_tracker.create_callback(),
158                )
159                .await?;
160
161                snapshot_progress_tracker.completed();
162            } else {
163                snapshot_progress_tracker.not_required();
164                if ForestCar::is_valid(&EitherMmapOrRandomAccessFile::open(from_path)?) {
165                    move_or_copy_file(from_path, &downloaded_car_temp_path, mode)?;
166                } else {
167                    // For a local snapshot, we transcode directly instead of copying & transcoding.
168                    transcode_into_forest_car(from_path, &downloaded_car_temp_path).await?;
169                    if mode == ImportMode::Move {
170                        std::fs::remove_file(from_path).context("Error removing original file")?;
171                    }
172                }
173            }
174
175            if ForestCar::is_valid(&EitherMmapOrRandomAccessFile::open(
176                &downloaded_car_temp_path,
177            )?) {
178                downloaded_car_temp_path.persist(&forest_car_db_path)?;
179            } else {
180                // Use another temp file to make sure all final `.forest.car.zst` files are complete and valid.
181                let forest_car_db_temp_path = new_forest_car_temp_path_in(forest_car_db_dir)?;
182                transcode_into_forest_car(&downloaded_car_temp_path, &forest_car_db_temp_path)
183                    .await?;
184                forest_car_db_temp_path.persist(&forest_car_db_path)?;
185            }
186            anyhow::Ok(())
187        }
188    };
189
190    match import_mode {
191        ImportMode::Auto => {
192            if Url::parse(&from_path.display().to_string()).is_ok() {
193                // Fallback to move if from_path is url
194                move_or_copy(ImportMode::Move).await?;
195            } else if ForestCar::is_valid(&EitherMmapOrRandomAccessFile::open(from_path)?) {
196                tracing::info!(
197                    "Hardlinking {} to {}",
198                    from_path.display(),
199                    forest_car_db_path.display()
200                );
201                if std::fs::hard_link(from_path, &forest_car_db_path).is_err() {
202                    tracing::warn!("Error creating hardlink, fallback to copy");
203                    move_or_copy(ImportMode::Copy).await?;
204                }
205            } else {
206                tracing::warn!(
207                    "Snapshot file is not a valid forest.car.zst file, fallback to copy"
208                );
209                move_or_copy(ImportMode::Copy).await?;
210            }
211        }
212        ImportMode::Copy | ImportMode::Move => {
213            move_or_copy(import_mode).await?;
214        }
215        ImportMode::Symlink => {
216            let from_path = std::path::absolute(from_path)?;
217            if ForestCar::is_valid(&EitherMmapOrRandomAccessFile::open(&from_path)?) {
218                tracing::info!(
219                    "Symlinking {} to {}",
220                    from_path.display(),
221                    forest_car_db_path.display()
222                );
223                std::os::unix::fs::symlink(from_path, &forest_car_db_path)
224                    .context("Error creating symlink")?;
225            } else {
226                bail!("Snapshot file must be a valid forest.car.zst file");
227            }
228        }
229        ImportMode::Hardlink => {
230            if ForestCar::is_valid(&EitherMmapOrRandomAccessFile::open(from_path)?) {
231                tracing::info!(
232                    "Hardlinking {} to {}",
233                    from_path.display(),
234                    forest_car_db_path.display()
235                );
236                std::fs::hard_link(from_path, &forest_car_db_path)
237                    .context("Error creating hardlink")?;
238            } else {
239                bail!("Snapshot file must be a valid forest.car.zst file");
240            }
241        }
242    };
243
244    let forest_car = ForestCar::try_from(forest_car_db_path.as_path())?;
245
246    if let Some(f3_cid) = forest_car.metadata().as_ref().and_then(|m| m.f3_data) {
247        if crate::f3::get_f3_sidecar_params(chain_config)
248            .initial_power_table
249            .is_none()
250        {
251            // To avoid importing old/wrong F3 data without initial power table check
252            tracing::warn!(
253                "skipped importing F3 data as the initial power table CID is not set in the current manifest"
254            );
255        } else {
256            let mut f3_data = forest_car
257                .get_reader(f3_cid)?
258                .with_context(|| format!("f3 data not found, cid: {f3_cid}"))?;
259            let mut temp_f3_snap = tempfile::Builder::new()
260                .suffix(".f3snap.bin")
261                .tempfile_in(forest_car_db_dir)?;
262            {
263                let f = temp_f3_snap.as_file_mut();
264                std::io::copy(&mut f3_data, f)?;
265                f.sync_all()?;
266            }
267            if let Err(e) = crate::f3::import_f3_snapshot(
268                chain_config,
269                rpc_endpoint.to_string(),
270                f3_root.display().to_string(),
271                temp_f3_snap.path().display().to_string(),
272            ) {
273                // Do not make it a hard error if anything is wrong with F3 snapshot
274                tracing::error!("Failed to import F3 snapshot: {e}");
275            }
276        }
277    }
278
279    let ts = forest_car.heaviest_tipset()?;
280    info!(
281        "Imported snapshot in: {}s, heaviest tipset epoch: {}, key: {}",
282        stopwatch.elapsed().as_secs(),
283        ts.epoch(),
284        ts.key()
285    );
286
287    Ok((forest_car_db_path, ts))
288}
289
290fn move_or_copy_file(from: &Path, to: &Path, import_mode: ImportMode) -> anyhow::Result<()> {
291    match import_mode {
292        ImportMode::Move => {
293            tracing::info!("Moving {} to {}", from.display(), to.display());
294            if fs::rename(from, to).is_ok() {
295                Ok(())
296            } else {
297                fs::copy(from, to).context("Error copying file")?;
298                fs::remove_file(from).context("Error removing original file")?;
299                Ok(())
300            }
301        }
302        ImportMode::Copy => {
303            tracing::info!("Copying {} to {}", from.display(), to.display());
304            fs::copy(from, to).map(|_| ()).context("Error copying file")
305        }
306        m => {
307            bail!("{m} must be handled elsewhere");
308        }
309    }
310}
311
312async fn transcode_into_forest_car(from: &Path, to: &Path) -> anyhow::Result<()> {
313    tracing::info!(
314        from = %from.display(),
315        to = %to.display(),
316        "transcoding into forest car"
317    );
318    let car_stream = CarStream::new_from_path(from).await?;
319    let roots = car_stream.header_v1.roots.clone();
320
321    let mut writer = tokio::io::BufWriter::new(tokio::fs::File::create(to).await?);
322    let frames = crate::db::car::forest::Encoder::compress_stream_default(
323        car_stream.map_err(anyhow::Error::from),
324    );
325    crate::db::car::forest::Encoder::write(&mut writer, roots, frames).await?;
326    writer.shutdown().await?;
327
328    Ok(())
329}
330
331async fn process_ts<DB>(
332    ts: &Tipset,
333    state_manager: &Arc<StateManager<DB>>,
334    delegated_messages: &mut Vec<(crate::message::SignedMessage, u64)>,
335) -> anyhow::Result<()>
336where
337    DB: fvm_ipld_blockstore::Blockstore + Send + Sync + 'static,
338{
339    let epoch = ts.epoch();
340    let tsk = ts.key().clone();
341
342    state_manager
343        .compute_tipset_state(ts.clone(), NO_CALLBACK, VMTrace::NotTraced)
344        .await?;
345
346    delegated_messages.append(
347        &mut state_manager
348            .chain_store()
349            .headers_delegated_messages(ts.block_headers().iter())?,
350    );
351    tracing::trace!("Indexing tipset @{}: {}", epoch, &tsk);
352    state_manager.chain_store().put_tipset_key(&tsk)?;
353
354    Ok(())
355}
356
357pub enum RangeSpec {
358    To(ChainEpoch),
359    NumTipsets(usize),
360}
361
362impl std::fmt::Display for RangeSpec {
363    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
364        match self {
365            RangeSpec::To(epoch) => write!(f, "To epoch:      {}", epoch),
366            RangeSpec::NumTipsets(n) => write!(f, "Tipsets:       {}", n),
367        }
368    }
369}
370
371/// To support the Event RPC API, a new column has been added to parity-db to handle the mapping:
372/// - Events root [`Cid`] -> [`TipsetKey`].
373///
374/// Similarly, to support the Ethereum RPC API, another column has been introduced to map:
375/// - [`struct@EthHash`] -> [`TipsetKey`],
376/// - [`struct@EthHash`] -> Delegated message [`Cid`].
377///
378/// This function traverses the chain store and populates these columns accordingly.
379pub async fn backfill_db<DB>(
380    state_manager: &Arc<StateManager<DB>>,
381    head_ts: &Tipset,
382    spec: RangeSpec,
383) -> anyhow::Result<()>
384where
385    DB: fvm_ipld_blockstore::Blockstore + Send + Sync + 'static,
386{
387    tracing::info!("Starting index backfill...");
388
389    let mut delegated_messages = vec![];
390
391    let mut num_backfills = 0;
392
393    match spec {
394        RangeSpec::To(to_epoch) => {
395            for ts in head_ts
396                .clone()
397                .chain(&state_manager.chain_store().blockstore())
398                .take_while(|ts| ts.epoch() >= to_epoch)
399            {
400                process_ts(&ts, state_manager, &mut delegated_messages).await?;
401                num_backfills += 1;
402            }
403        }
404        RangeSpec::NumTipsets(n_tipsets) => {
405            for ts in head_ts
406                .clone()
407                .chain(&state_manager.chain_store().blockstore())
408                .take(n_tipsets)
409            {
410                process_ts(&ts, state_manager, &mut delegated_messages).await?;
411                num_backfills += 1;
412            }
413        }
414    }
415
416    state_manager
417        .chain_store()
418        .process_signed_messages(&delegated_messages)?;
419
420    tracing::info!("Total successful backfills: {}", num_backfills);
421
422    Ok(())
423}
424
425#[cfg(test)]
426mod test {
427    use super::*;
428
429    #[tokio::test]
430    async fn import_snapshot_from_file_valid() {
431        for import_mode in [ImportMode::Auto, ImportMode::Copy, ImportMode::Move] {
432            import_snapshot_from_file("test-snapshots/chain4.car", import_mode)
433                .await
434                .unwrap();
435        }
436
437        // Linking is not supported for raw CAR files.
438        for import_mode in [ImportMode::Symlink, ImportMode::Hardlink] {
439            import_snapshot_from_file("test-snapshots/chain4.car", import_mode)
440                .await
441                .unwrap_err();
442        }
443    }
444
445    #[tokio::test]
446    async fn import_snapshot_from_compressed_file_valid() {
447        for import_mode in [ImportMode::Auto, ImportMode::Copy, ImportMode::Move] {
448            import_snapshot_from_file("test-snapshots/chain4.car.zst", import_mode)
449                .await
450                .unwrap();
451        }
452
453        // Linking is not supported for raw CAR files.
454        for import_mode in [ImportMode::Symlink, ImportMode::Hardlink] {
455            import_snapshot_from_file("test-snapshots/chain4.car", import_mode)
456                .await
457                .unwrap_err();
458        }
459    }
460
461    #[tokio::test]
462    async fn import_snapshot_from_forest_car_valid() {
463        for import_mode in [
464            ImportMode::Auto,
465            ImportMode::Copy,
466            ImportMode::Move,
467            ImportMode::Symlink,
468            ImportMode::Hardlink,
469        ] {
470            import_snapshot_from_file("test-snapshots/chain4.forest.car.zst", import_mode)
471                .await
472                .unwrap();
473        }
474    }
475
476    #[tokio::test]
477    async fn import_snapshot_from_file_invalid() {
478        for import_mode in &[
479            ImportMode::Auto,
480            ImportMode::Copy,
481            ImportMode::Move,
482            ImportMode::Symlink,
483            ImportMode::Hardlink,
484        ] {
485            import_snapshot_from_file("Cargo.toml", *import_mode)
486                .await
487                .unwrap_err();
488        }
489    }
490
491    #[tokio::test]
492    async fn import_snapshot_from_file_not_found() {
493        for import_mode in &[
494            ImportMode::Auto,
495            ImportMode::Copy,
496            ImportMode::Move,
497            ImportMode::Symlink,
498            ImportMode::Hardlink,
499        ] {
500            import_snapshot_from_file("dummy.car", *import_mode)
501                .await
502                .unwrap_err();
503        }
504    }
505
506    #[tokio::test]
507    async fn import_snapshot_from_url_not_found() {
508        for import_mode in &[
509            ImportMode::Auto,
510            ImportMode::Copy,
511            ImportMode::Move,
512            ImportMode::Symlink,
513            ImportMode::Hardlink,
514        ] {
515            import_snapshot_from_file("https://forest.chainsafe.io/dummy.car", *import_mode)
516                .await
517                .unwrap_err();
518        }
519    }
520
521    async fn import_snapshot_from_file(
522        file_path: &str,
523        import_mode: ImportMode,
524    ) -> anyhow::Result<()> {
525        // Prevent modifications on the original file, e.g., deletion via `ImportMode::Move`.
526        let temp_file = tempfile::Builder::new().tempfile()?;
527        fs::copy(Path::new(file_path), temp_file.path())?;
528        let file_path = temp_file.path();
529
530        let temp_db_dir = tempfile::Builder::new().tempdir()?;
531
532        let (path, ts) = import_chain_as_forest_car(
533            file_path,
534            temp_db_dir.path(),
535            import_mode,
536            "http://127.0.0.1:2345/rpc/v1".parse().unwrap(),
537            Path::new("test"),
538            &ChainConfig::devnet(),
539            &SnapshotProgressTracker::default(),
540        )
541        .await?;
542        match import_mode {
543            ImportMode::Symlink => {
544                assert_eq!(
545                    std::path::absolute(path.read_link()?)?,
546                    std::path::absolute(file_path)?
547                );
548            }
549            ImportMode::Move => {
550                assert!(!file_path.exists());
551                assert!(path.is_file());
552            }
553            _ => {
554                assert!(file_path.is_file());
555                assert!(path.is_file());
556            }
557        }
558        assert!(ts.epoch() > 0);
559        Ok(())
560    }
561}