Skip to main content

forest/daemon/
db_util.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use crate::blocks::Tipset;
5use crate::db::car::forest::{
6    FOREST_CAR_FILE_EXTENSION, TEMP_FOREST_CAR_FILE_EXTENSION, new_forest_car_temp_path_in,
7};
8use crate::db::car::{ForestCar, ManyCar};
9use crate::db::{Blockstore, EthMappingsStore};
10use crate::networks::ChainConfig;
11use crate::rpc::sync::SnapshotProgressTracker;
12use crate::shim::clock::ChainEpoch;
13use crate::state_manager::StateManager;
14use crate::utils::ShallowClone as _;
15use crate::utils::db::car_stream::CarStream;
16use crate::utils::io::EitherMmapOrRandomAccessFile;
17use crate::utils::net::{DownloadFileOption, download_to};
18use anyhow::{Context, bail};
19use futures::TryStreamExt;
20use serde::{Deserialize, Serialize};
21use std::{
22    ffi::OsStr,
23    fs,
24    path::{Path, PathBuf},
25    sync::Arc,
26    time,
27};
28use tokio::io::AsyncWriteExt;
29use tracing::{debug, info, warn};
30use url::Url;
31use walkdir::WalkDir;
32
33#[cfg(doc)]
34use crate::rpc::eth::types::EthHash;
35
36#[cfg(doc)]
37use crate::blocks::TipsetKey;
38
39#[cfg(doc)]
40use cid::Cid;
41
42/// Loads all `.forest.car.zst` snapshots and cleanup stale `.forest.car.zst.tmp` files.
43pub fn load_all_forest_cars_with_cleanup<T>(
44    store: &ManyCar<T>,
45    forest_car_db_dir: &Path,
46) -> anyhow::Result<()> {
47    load_all_forest_cars_internal(store, forest_car_db_dir, true)
48}
49
50/// Loads all `.forest.car.zst` snapshots
51pub fn load_all_forest_cars<T>(store: &ManyCar<T>, forest_car_db_dir: &Path) -> anyhow::Result<()> {
52    load_all_forest_cars_internal(store, forest_car_db_dir, false)
53}
54
55fn load_all_forest_cars_internal<T>(
56    store: &ManyCar<T>,
57    forest_car_db_dir: &Path,
58    cleanup: bool,
59) -> anyhow::Result<()> {
60    if !forest_car_db_dir.is_dir() {
61        fs::create_dir_all(forest_car_db_dir)?;
62    }
63    for file in WalkDir::new(forest_car_db_dir)
64        .max_depth(1)
65        .into_iter()
66        .filter_map(|e| {
67            e.ok().and_then(|e| {
68                if !e.file_type().is_dir() {
69                    Some(e.into_path())
70                } else {
71                    None
72                }
73            })
74        })
75    {
76        if let Some(filename) = file.file_name().and_then(OsStr::to_str) {
77            if filename.ends_with(FOREST_CAR_FILE_EXTENSION) {
78                let car = ForestCar::try_from(file.as_path())
79                    .with_context(|| format!("Error loading car DB at {}", file.display()))?;
80                store.read_only(car.into())?;
81                debug!("Loaded car DB at {}", file.display());
82            } else if cleanup && filename.ends_with(TEMP_FOREST_CAR_FILE_EXTENSION) {
83                // Only delete files that appear to be incomplete car DB files
84                match std::fs::remove_file(&file) {
85                    Ok(_) => {
86                        info!("Deleted temp car DB at {}", file.display());
87                    }
88                    Err(e) => {
89                        warn!("Failed to delete temp car DB at {}: {e}", file.display());
90                    }
91                }
92            }
93        }
94    }
95
96    tracing::info!("Loaded {} CARs", store.len());
97
98    Ok(())
99}
100
101#[derive(
102    Default,
103    PartialEq,
104    Eq,
105    Debug,
106    Clone,
107    Copy,
108    strum::Display,
109    strum::EnumString,
110    Serialize,
111    Deserialize,
112)]
113#[strum(serialize_all = "lowercase")]
114#[cfg_attr(test, derive(derive_quickcheck_arbitrary::Arbitrary))]
115pub enum ImportMode {
116    #[default]
117    /// Hard link the snapshot and fallback to `Copy` if not applicable
118    Auto,
119    /// Copies the snapshot to the database directory.
120    Copy,
121    /// Moves the snapshot to the database directory (or copies and deletes the original).
122    Move,
123    /// Creates a symbolic link to the snapshot in the database directory.
124    Symlink,
125    /// Creates a symbolic link to the snapshot in the database directory.
126    Hardlink,
127}
128
129/// This function validates and stores the CAR binary from `from_path`(either local path or URL) into the `{DB_ROOT}/car_db/`
130/// (automatically trans-code into `.forest.car.zst` format when needed), and returns its final file path and the heaviest tipset.
131pub async fn import_chain_as_forest_car(
132    from_path: &Path,
133    forest_car_db_dir: &Path,
134    import_mode: ImportMode,
135    rpc_endpoint: Url,
136    f3_root: &Path,
137    chain_config: &ChainConfig,
138    snapshot_progress_tracker: &SnapshotProgressTracker,
139) -> anyhow::Result<(PathBuf, Tipset)> {
140    info!("Importing chain from snapshot at: {}", from_path.display());
141
142    let stopwatch = time::Instant::now();
143
144    let forest_car_db_path = forest_car_db_dir.join(format!(
145        "{}{FOREST_CAR_FILE_EXTENSION}",
146        chrono::Utc::now().timestamp_millis()
147    ));
148
149    let move_or_copy = |mode: ImportMode| {
150        let forest_car_db_path = forest_car_db_path.clone();
151        async move {
152            let downloaded_car_temp_path = new_forest_car_temp_path_in(forest_car_db_dir)?;
153            if let Ok(url) = Url::parse(&from_path.display().to_string()) {
154                download_to(
155                    &url,
156                    &downloaded_car_temp_path,
157                    DownloadFileOption::Resumable,
158                    snapshot_progress_tracker.create_callback(),
159                )
160                .await?;
161
162                snapshot_progress_tracker.completed();
163            } else {
164                snapshot_progress_tracker.not_required();
165                if ForestCar::is_valid(&EitherMmapOrRandomAccessFile::open(from_path)?) {
166                    move_or_copy_file(from_path, &downloaded_car_temp_path, mode)?;
167                } else {
168                    // For a local snapshot, we transcode directly instead of copying & transcoding.
169                    transcode_into_forest_car(from_path, &downloaded_car_temp_path).await?;
170                    if mode == ImportMode::Move {
171                        std::fs::remove_file(from_path).context("Error removing original file")?;
172                    }
173                }
174            }
175
176            if ForestCar::is_valid(&EitherMmapOrRandomAccessFile::open(
177                &downloaded_car_temp_path,
178            )?) {
179                downloaded_car_temp_path.persist(&forest_car_db_path)?;
180            } else {
181                // Use another temp file to make sure all final `.forest.car.zst` files are complete and valid.
182                let forest_car_db_temp_path = new_forest_car_temp_path_in(forest_car_db_dir)?;
183                transcode_into_forest_car(&downloaded_car_temp_path, &forest_car_db_temp_path)
184                    .await?;
185                forest_car_db_temp_path.persist(&forest_car_db_path)?;
186            }
187            anyhow::Ok(())
188        }
189    };
190
191    match import_mode {
192        ImportMode::Auto => {
193            if Url::parse(&from_path.display().to_string()).is_ok() {
194                // Fallback to move if from_path is url
195                move_or_copy(ImportMode::Move).await?;
196            } else if ForestCar::is_valid(&EitherMmapOrRandomAccessFile::open(from_path)?) {
197                tracing::info!(
198                    "Hardlinking {} to {}",
199                    from_path.display(),
200                    forest_car_db_path.display()
201                );
202                if std::fs::hard_link(from_path, &forest_car_db_path).is_err() {
203                    tracing::warn!("Error creating hardlink, fallback to copy");
204                    move_or_copy(ImportMode::Copy).await?;
205                }
206            } else {
207                tracing::warn!(
208                    "Snapshot file is not a valid forest.car.zst file, fallback to copy"
209                );
210                move_or_copy(ImportMode::Copy).await?;
211            }
212        }
213        ImportMode::Copy | ImportMode::Move => {
214            move_or_copy(import_mode).await?;
215        }
216        ImportMode::Symlink => {
217            let from_path = std::path::absolute(from_path)?;
218            if ForestCar::is_valid(&EitherMmapOrRandomAccessFile::open(&from_path)?) {
219                tracing::info!(
220                    "Symlinking {} to {}",
221                    from_path.display(),
222                    forest_car_db_path.display()
223                );
224                std::os::unix::fs::symlink(from_path, &forest_car_db_path)
225                    .context("Error creating symlink")?;
226            } else {
227                bail!("Snapshot file must be a valid forest.car.zst file");
228            }
229        }
230        ImportMode::Hardlink => {
231            if ForestCar::is_valid(&EitherMmapOrRandomAccessFile::open(from_path)?) {
232                tracing::info!(
233                    "Hardlinking {} to {}",
234                    from_path.display(),
235                    forest_car_db_path.display()
236                );
237                std::fs::hard_link(from_path, &forest_car_db_path)
238                    .context("Error creating hardlink")?;
239            } else {
240                bail!("Snapshot file must be a valid forest.car.zst file");
241            }
242        }
243    };
244
245    let forest_car = ForestCar::try_from(forest_car_db_path.as_path())?;
246
247    if let Some(f3_cid) = forest_car.metadata().and_then(|m| m.f3_data) {
248        if crate::f3::get_f3_sidecar_params(chain_config)
249            .initial_power_table
250            .is_none()
251        {
252            // To avoid importing old/wrong F3 data without initial power table check
253            tracing::warn!(
254                "skipped importing F3 data as the initial power table CID is not set in the current manifest"
255            );
256        } else {
257            let mut f3_data = forest_car
258                .get_reader(f3_cid)?
259                .with_context(|| format!("f3 data not found, cid: {f3_cid}"))?;
260            let mut temp_f3_snap = tempfile::Builder::new()
261                .suffix(".f3snap.bin")
262                .tempfile_in(forest_car_db_dir)?;
263            {
264                let f = temp_f3_snap.as_file_mut();
265                std::io::copy(&mut f3_data, f)?;
266                f.sync_all()?;
267            }
268            if let Err(e) = crate::f3::import_f3_snapshot(
269                chain_config,
270                rpc_endpoint.to_string(),
271                f3_root.display().to_string(),
272                temp_f3_snap.path().display().to_string(),
273            ) {
274                // Do not make it a hard error if anything is wrong with F3 snapshot
275                tracing::error!("Failed to import F3 snapshot: {e:#}");
276            }
277        }
278    }
279
280    let ts = forest_car.heaviest_tipset()?;
281    info!(
282        "Imported snapshot in: {}s, heaviest tipset epoch: {}, key: {}",
283        stopwatch.elapsed().as_secs(),
284        ts.epoch(),
285        ts.key()
286    );
287
288    Ok((forest_car_db_path, ts))
289}
290
291fn move_or_copy_file(from: &Path, to: &Path, import_mode: ImportMode) -> anyhow::Result<()> {
292    match import_mode {
293        ImportMode::Move => {
294            tracing::info!("Moving {} to {}", from.display(), to.display());
295            if fs::rename(from, to).is_ok() {
296                Ok(())
297            } else {
298                fs::copy(from, to).context("Error copying file")?;
299                fs::remove_file(from).context("Error removing original file")?;
300                Ok(())
301            }
302        }
303        ImportMode::Copy => {
304            tracing::info!("Copying {} to {}", from.display(), to.display());
305            fs::copy(from, to).map(|_| ()).context("Error copying file")
306        }
307        m => {
308            bail!("{m} must be handled elsewhere");
309        }
310    }
311}
312
313async fn transcode_into_forest_car(from: &Path, to: &Path) -> anyhow::Result<()> {
314    tracing::info!(
315        from = %from.display(),
316        to = %to.display(),
317        "transcoding into forest car"
318    );
319    let car_stream = CarStream::new_from_path(from).await?;
320    let roots = car_stream.header_v1.roots.clone();
321
322    let mut writer = tokio::io::BufWriter::new(tokio::fs::File::create(to).await?);
323    let frames = crate::db::car::forest::Encoder::compress_stream_default(
324        car_stream.map_err(anyhow::Error::from),
325    );
326    crate::db::car::forest::Encoder::write(&mut writer, roots, frames).await?;
327    writer.shutdown().await?;
328
329    Ok(())
330}
331
332async fn process_ts<DB>(
333    ts: &Tipset,
334    state_manager: &Arc<StateManager<DB>>,
335    delegated_messages: &mut Vec<(crate::message::SignedMessage, u64)>,
336) -> anyhow::Result<()>
337where
338    DB: Blockstore + EthMappingsStore + Send + Sync + 'static,
339{
340    let epoch = ts.epoch();
341    let tsk = ts.key().clone();
342
343    state_manager.load_executed_tipset(ts).await?;
344
345    delegated_messages.append(
346        &mut state_manager
347            .chain_store()
348            .headers_delegated_messages(ts.block_headers().iter())?,
349    );
350    tracing::trace!("Indexing tipset @{}: {}", epoch, &tsk);
351    tsk.save(state_manager.blockstore())?;
352
353    Ok(())
354}
355
356pub enum RangeSpec {
357    To(ChainEpoch),
358    NumTipsets(usize),
359}
360
361impl std::fmt::Display for RangeSpec {
362    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
363        match self {
364            RangeSpec::To(epoch) => write!(f, "To epoch:      {}", epoch),
365            RangeSpec::NumTipsets(n) => write!(f, "Tipsets:       {}", n),
366        }
367    }
368}
369
370/// To support the Event RPC API, a new column has been added to parity-db to handle the mapping:
371/// - Events root [`Cid`] -> [`TipsetKey`].
372///
373/// Similarly, to support the Ethereum RPC API, another column has been introduced to map:
374/// - [`struct@EthHash`] -> [`TipsetKey`],
375/// - [`struct@EthHash`] -> Delegated message [`Cid`].
376///
377/// This function traverses the chain store and populates these columns accordingly.
378pub async fn backfill_db<DB>(
379    state_manager: &Arc<StateManager<DB>>,
380    head_ts: &Tipset,
381    spec: RangeSpec,
382) -> anyhow::Result<()>
383where
384    DB: Blockstore + EthMappingsStore + Send + Sync + 'static,
385{
386    tracing::info!("Starting index backfill...");
387
388    let mut delegated_messages = vec![];
389
390    let mut num_backfills = 0;
391
392    match spec {
393        RangeSpec::To(to_epoch) => {
394            for ts in head_ts
395                .shallow_clone()
396                .chain(&state_manager.chain_store().blockstore())
397                .take_while(|ts| ts.epoch() >= to_epoch)
398            {
399                process_ts(&ts, state_manager, &mut delegated_messages).await?;
400                num_backfills += 1;
401            }
402        }
403        RangeSpec::NumTipsets(n_tipsets) => {
404            for ts in head_ts
405                .shallow_clone()
406                .chain(&state_manager.chain_store().blockstore())
407                .take(n_tipsets)
408            {
409                process_ts(&ts, state_manager, &mut delegated_messages).await?;
410                num_backfills += 1;
411            }
412        }
413    }
414
415    state_manager
416        .chain_store()
417        .process_signed_messages(&delegated_messages)?;
418
419    tracing::info!("Total successful backfills: {}", num_backfills);
420
421    Ok(())
422}
423
424#[cfg(test)]
425mod test {
426    use super::*;
427
428    #[tokio::test]
429    async fn import_snapshot_from_file_valid() {
430        for import_mode in [ImportMode::Auto, ImportMode::Copy, ImportMode::Move] {
431            import_snapshot_from_file("test-snapshots/chain4.car", import_mode)
432                .await
433                .unwrap();
434        }
435
436        // Linking is not supported for raw CAR files.
437        for import_mode in [ImportMode::Symlink, ImportMode::Hardlink] {
438            import_snapshot_from_file("test-snapshots/chain4.car", import_mode)
439                .await
440                .unwrap_err();
441        }
442    }
443
444    #[tokio::test]
445    async fn import_snapshot_from_compressed_file_valid() {
446        for import_mode in [ImportMode::Auto, ImportMode::Copy, ImportMode::Move] {
447            import_snapshot_from_file("test-snapshots/chain4.car.zst", import_mode)
448                .await
449                .unwrap();
450        }
451
452        // Linking is not supported for raw CAR files.
453        for import_mode in [ImportMode::Symlink, ImportMode::Hardlink] {
454            import_snapshot_from_file("test-snapshots/chain4.car", import_mode)
455                .await
456                .unwrap_err();
457        }
458    }
459
460    #[tokio::test]
461    async fn import_snapshot_from_forest_car_valid() {
462        for import_mode in [
463            ImportMode::Auto,
464            ImportMode::Copy,
465            ImportMode::Move,
466            ImportMode::Symlink,
467            ImportMode::Hardlink,
468        ] {
469            import_snapshot_from_file("test-snapshots/chain4.forest.car.zst", import_mode)
470                .await
471                .unwrap();
472        }
473    }
474
475    #[tokio::test]
476    async fn import_snapshot_from_file_invalid() {
477        for import_mode in &[
478            ImportMode::Auto,
479            ImportMode::Copy,
480            ImportMode::Move,
481            ImportMode::Symlink,
482            ImportMode::Hardlink,
483        ] {
484            import_snapshot_from_file("Cargo.toml", *import_mode)
485                .await
486                .unwrap_err();
487        }
488    }
489
490    #[tokio::test]
491    async fn import_snapshot_from_file_not_found() {
492        for import_mode in &[
493            ImportMode::Auto,
494            ImportMode::Copy,
495            ImportMode::Move,
496            ImportMode::Symlink,
497            ImportMode::Hardlink,
498        ] {
499            import_snapshot_from_file("dummy.car", *import_mode)
500                .await
501                .unwrap_err();
502        }
503    }
504
505    #[tokio::test]
506    async fn import_snapshot_from_url_not_found() {
507        for import_mode in &[
508            ImportMode::Auto,
509            ImportMode::Copy,
510            ImportMode::Move,
511            ImportMode::Symlink,
512            ImportMode::Hardlink,
513        ] {
514            import_snapshot_from_file("https://forest.chainsafe.io/dummy.car", *import_mode)
515                .await
516                .unwrap_err();
517        }
518    }
519
520    async fn import_snapshot_from_file(
521        file_path: &str,
522        import_mode: ImportMode,
523    ) -> anyhow::Result<()> {
524        // Prevent modifications on the original file, e.g., deletion via `ImportMode::Move`.
525        let temp_file = tempfile::Builder::new().tempfile()?;
526        fs::copy(Path::new(file_path), temp_file.path())?;
527        let file_path = temp_file.path();
528
529        let temp_db_dir = tempfile::Builder::new().tempdir()?;
530
531        let (path, ts) = import_chain_as_forest_car(
532            file_path,
533            temp_db_dir.path(),
534            import_mode,
535            "http://127.0.0.1:2345/rpc/v1".parse().unwrap(),
536            Path::new("test"),
537            &ChainConfig::devnet(),
538            &SnapshotProgressTracker::default(),
539        )
540        .await?;
541        match import_mode {
542            ImportMode::Symlink => {
543                assert_eq!(
544                    std::path::absolute(path.read_link()?)?,
545                    std::path::absolute(file_path)?
546                );
547            }
548            ImportMode::Move => {
549                assert!(!file_path.exists());
550                assert!(path.is_file());
551            }
552            _ => {
553                assert!(file_path.is_file());
554                assert!(path.is_file());
555            }
556        }
557        assert!(ts.epoch() > 0);
558        Ok(())
559    }
560}