solana-core 1.14.13

Blockchain, Rebuilt for Scale
Documentation
use {
    solana_gossip::cluster_info::{
        ClusterInfo, MAX_INCREMENTAL_SNAPSHOT_HASHES, MAX_SNAPSHOT_HASHES,
    },
    solana_perf::thread::renice_this_thread,
    solana_runtime::{
        snapshot_archive_info::SnapshotArchiveInfoGetter,
        snapshot_config::SnapshotConfig,
        snapshot_hash::{
            FullSnapshotHash, FullSnapshotHashes, IncrementalSnapshotHash,
            IncrementalSnapshotHashes, StartingSnapshotHashes,
        },
        snapshot_package::{retain_max_n_elements, PendingSnapshotPackage, SnapshotType},
        snapshot_utils,
    },
    solana_sdk::{clock::Slot, hash::Hash},
    std::{
        sync::{
            atomic::{AtomicBool, Ordering},
            Arc,
        },
        thread::{self, Builder, JoinHandle},
        time::Duration,
    },
};

pub struct SnapshotPackagerService {
    t_snapshot_packager: JoinHandle<()>,
}

impl SnapshotPackagerService {
    pub fn new(
        pending_snapshot_package: PendingSnapshotPackage,
        starting_snapshot_hashes: Option<StartingSnapshotHashes>,
        exit: &Arc<AtomicBool>,
        cluster_info: &Arc<ClusterInfo>,
        snapshot_config: SnapshotConfig,
        enable_gossip_push: bool,
    ) -> Self {
        let exit = exit.clone();
        let cluster_info = cluster_info.clone();
        let max_full_snapshot_hashes = std::cmp::min(
            MAX_SNAPSHOT_HASHES,
            snapshot_config.maximum_full_snapshot_archives_to_retain,
        );
        let max_incremental_snapshot_hashes = std::cmp::min(
            MAX_INCREMENTAL_SNAPSHOT_HASHES,
            snapshot_config.maximum_incremental_snapshot_archives_to_retain,
        );

        let t_snapshot_packager = Builder::new()
            .name("solSnapshotPkgr".to_string())
            .spawn(move || {
                renice_this_thread(snapshot_config.packager_thread_niceness_adj).unwrap();
                let mut snapshot_gossip_manager = if enable_gossip_push {
                    Some(SnapshotGossipManager {
                        cluster_info,
                        max_full_snapshot_hashes,
                        max_incremental_snapshot_hashes,
                        full_snapshot_hashes: FullSnapshotHashes::default(),
                        incremental_snapshot_hashes: IncrementalSnapshotHashes::default(),
                    })
                } else {
                    None
                };
                if let Some(snapshot_gossip_manager) = snapshot_gossip_manager.as_mut() {
                    snapshot_gossip_manager.push_starting_snapshot_hashes(starting_snapshot_hashes);
                }

                loop {
                    if exit.load(Ordering::Relaxed) {
                        break;
                    }

                    let snapshot_package = pending_snapshot_package.lock().unwrap().take();
                    if snapshot_package.is_none() {
                        std::thread::sleep(Duration::from_millis(100));
                        continue;
                    }
                    let snapshot_package = snapshot_package.unwrap();

                    // Archiving the snapshot package is not allowed to fail.
                    // AccountsBackgroundService calls `clean_accounts()` with a value for
                    // last_full_snapshot_slot that requires this archive call to succeed.
                    snapshot_utils::archive_snapshot_package(
                        &snapshot_package,
                        &snapshot_config.full_snapshot_archives_dir,
                        &snapshot_config.incremental_snapshot_archives_dir,
                        snapshot_config.maximum_full_snapshot_archives_to_retain,
                        snapshot_config.maximum_incremental_snapshot_archives_to_retain,
                    )
                    .expect("failed to archive snapshot package");

                    if let Some(snapshot_gossip_manager) = snapshot_gossip_manager.as_mut() {
                        snapshot_gossip_manager.push_snapshot_hash(
                            snapshot_package.snapshot_type,
                            (snapshot_package.slot(), *snapshot_package.hash()),
                        );
                    }
                }
            })
            .unwrap();

        Self {
            t_snapshot_packager,
        }
    }

    pub fn join(self) -> thread::Result<()> {
        self.t_snapshot_packager.join()
    }
}

struct SnapshotGossipManager {
    cluster_info: Arc<ClusterInfo>,
    max_full_snapshot_hashes: usize,
    max_incremental_snapshot_hashes: usize,
    full_snapshot_hashes: FullSnapshotHashes,
    incremental_snapshot_hashes: IncrementalSnapshotHashes,
}

impl SnapshotGossipManager {
    /// If there were starting snapshot hashes, add those to their respective vectors, then push
    /// those vectors to the cluster via CRDS.
    fn push_starting_snapshot_hashes(
        &mut self,
        starting_snapshot_hashes: Option<StartingSnapshotHashes>,
    ) {
        if let Some(starting_snapshot_hashes) = starting_snapshot_hashes {
            let starting_full_snapshot_hash = starting_snapshot_hashes.full;
            self.push_full_snapshot_hash(starting_full_snapshot_hash);

            if let Some(starting_incremental_snapshot_hash) = starting_snapshot_hashes.incremental {
                self.push_incremental_snapshot_hash(starting_incremental_snapshot_hash);
            };
        }
    }

    /// Add `snapshot_hash` to its respective vector of hashes, then push that vector to the
    /// cluster via CRDS.
    fn push_snapshot_hash(&mut self, snapshot_type: SnapshotType, snapshot_hash: (Slot, Hash)) {
        match snapshot_type {
            SnapshotType::FullSnapshot => {
                self.push_full_snapshot_hash(FullSnapshotHash {
                    hash: snapshot_hash,
                });
            }
            SnapshotType::IncrementalSnapshot(base_slot) => {
                let latest_full_snapshot_hash = *self.full_snapshot_hashes.hashes.last().unwrap();
                assert_eq!(
                    base_slot, latest_full_snapshot_hash.0,
                    "the incremental snapshot's base slot ({}) must match the latest full snapshot hash's slot ({})",
                    base_slot, latest_full_snapshot_hash.0,
                );
                self.push_incremental_snapshot_hash(IncrementalSnapshotHash {
                    base: latest_full_snapshot_hash,
                    hash: snapshot_hash,
                });
            }
        }
    }

    /// Add `full_snapshot_hash` to the vector of full snapshot hashes, then push that vector to
    /// the cluster via CRDS.
    fn push_full_snapshot_hash(&mut self, full_snapshot_hash: FullSnapshotHash) {
        self.full_snapshot_hashes
            .hashes
            .push(full_snapshot_hash.hash);

        retain_max_n_elements(
            &mut self.full_snapshot_hashes.hashes,
            self.max_full_snapshot_hashes,
        );

        self.cluster_info
            .push_snapshot_hashes(self.full_snapshot_hashes.hashes.clone());
    }

    /// Add `incremental_snapshot_hash` to the vector of incremental snapshot hashes, then push
    /// that vector to the cluster via CRDS.
    fn push_incremental_snapshot_hash(
        &mut self,
        incremental_snapshot_hash: IncrementalSnapshotHash,
    ) {
        // If the base snapshot hash is different from the one in IncrementalSnapshotHashes, then
        // that means the old incremental snapshot hashes are no longer valid, so clear them all
        // out.
        if incremental_snapshot_hash.base != self.incremental_snapshot_hashes.base {
            self.incremental_snapshot_hashes.hashes.clear();
            self.incremental_snapshot_hashes.base = incremental_snapshot_hash.base;
        }

        self.incremental_snapshot_hashes
            .hashes
            .push(incremental_snapshot_hash.hash);

        retain_max_n_elements(
            &mut self.incremental_snapshot_hashes.hashes,
            self.max_incremental_snapshot_hashes,
        );

        // Pushing incremental snapshot hashes to the cluster should never fail.  The only error
        // case is when the length of the hashes is too big, but we account for that with
        // `max_incremental_snapshot_hashes`.  If this call ever does error, it's a programmer bug!
        // Check to see what changed in `push_incremental_snapshot_hashes()` and handle the new
        // error condition here.
        self.cluster_info
            .push_incremental_snapshot_hashes(
                self.incremental_snapshot_hashes.base,
                self.incremental_snapshot_hashes.hashes.clone(),
            )
            .expect(
                "Bug! The programmer contract has changed for push_incremental_snapshot_hashes() \
                 and a new error case has been added, which has not been handled here.",
            );
    }
}

#[cfg(test)]
mod tests {
    use {
        super::*,
        bincode::serialize_into,
        solana_runtime::{
            accounts_db::AccountStorageEntry,
            bank::BankSlotDelta,
            snapshot_archive_info::SnapshotArchiveInfo,
            snapshot_package::{SnapshotPackage, SnapshotType},
            snapshot_utils::{
                self, ArchiveFormat, SnapshotVersion, SNAPSHOT_STATUS_CACHE_FILENAME,
            },
        },
        solana_sdk::hash::Hash,
        std::{
            fs::{self, remove_dir_all, OpenOptions},
            io::Write,
            path::{Path, PathBuf},
        },
        tempfile::TempDir,
    };

    // Create temporary placeholder directory for all test files
    fn make_tmp_dir_path() -> PathBuf {
        let out_dir = std::env::var("FARF_DIR").unwrap_or_else(|_| "farf".to_string());
        let path = PathBuf::from(format!("{}/tmp/test_package_snapshots", out_dir));

        // whack any possible collision
        let _ignored = std::fs::remove_dir_all(&path);
        // whack any possible collision
        let _ignored = std::fs::remove_file(&path);

        path
    }

    #[test]
    fn test_package_snapshots_relative_ledger_path() {
        let temp_dir = make_tmp_dir_path();
        create_and_verify_snapshot(&temp_dir);
        remove_dir_all(temp_dir).expect("should remove tmp dir");
    }

    #[test]
    fn test_package_snapshots() {
        create_and_verify_snapshot(TempDir::new().unwrap().path())
    }

    fn create_and_verify_snapshot(temp_dir: &Path) {
        let accounts_dir = temp_dir.join("accounts");
        let snapshots_dir = temp_dir.join("snapshots");
        let full_snapshot_archives_dir = temp_dir.join("full_snapshot_archives");
        let incremental_snapshot_archives_dir = temp_dir.join("incremental_snapshot_archives");
        fs::create_dir_all(&full_snapshot_archives_dir).unwrap();
        fs::create_dir_all(&incremental_snapshot_archives_dir).unwrap();

        fs::create_dir_all(&accounts_dir).unwrap();
        // Create some storage entries
        let storage_entries: Vec<_> = (0..5)
            .map(|i| Arc::new(AccountStorageEntry::new(&accounts_dir, 0, i, 10)))
            .collect();

        // Create some fake snapshot
        let snapshots_paths: Vec<_> = (0..5)
            .map(|i| {
                let snapshot_file_name = format!("{}", i);
                let snapshots_dir = snapshots_dir.join(&snapshot_file_name);
                fs::create_dir_all(&snapshots_dir).unwrap();
                let fake_snapshot_path = snapshots_dir.join(&snapshot_file_name);
                let mut fake_snapshot_file = OpenOptions::new()
                    .read(true)
                    .write(true)
                    .create(true)
                    .open(&fake_snapshot_path)
                    .unwrap();

                fake_snapshot_file.write_all(b"Hello, world!").unwrap();
                fake_snapshot_path
            })
            .collect();

        // Create directory of hard links for snapshots
        let link_snapshots_dir = tempfile::tempdir_in(temp_dir).unwrap();
        for snapshots_path in snapshots_paths {
            let snapshot_file_name = snapshots_path.file_name().unwrap();
            let link_snapshots_dir = link_snapshots_dir.path().join(snapshot_file_name);
            fs::create_dir_all(&link_snapshots_dir).unwrap();
            let link_path = link_snapshots_dir.join(snapshot_file_name);
            fs::hard_link(&snapshots_path, &link_path).unwrap();
        }

        // Create a packageable snapshot
        let slot = 42;
        let hash = Hash::default();
        let archive_format = ArchiveFormat::TarBzip2;
        let output_tar_path = snapshot_utils::build_full_snapshot_archive_path(
            &full_snapshot_archives_dir,
            slot,
            &hash,
            archive_format,
        );
        let snapshot_package = SnapshotPackage {
            snapshot_archive_info: SnapshotArchiveInfo {
                path: output_tar_path.clone(),
                slot,
                hash,
                archive_format,
            },
            block_height: slot,
            slot_deltas: vec![],
            snapshot_links: link_snapshots_dir,
            snapshot_storages: vec![storage_entries],
            snapshot_version: SnapshotVersion::default(),
            snapshot_type: SnapshotType::FullSnapshot,
        };

        // Make tarball from packageable snapshot
        snapshot_utils::archive_snapshot_package(
            &snapshot_package,
            full_snapshot_archives_dir,
            incremental_snapshot_archives_dir,
            snapshot_utils::DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN,
            snapshot_utils::DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
        )
        .unwrap();

        // before we compare, stick an empty status_cache in this dir so that the package comparison works
        // This is needed since the status_cache is added by the packager and is not collected from
        // the source dir for snapshots
        let dummy_slot_deltas: Vec<BankSlotDelta> = vec![];
        snapshot_utils::serialize_snapshot_data_file(
            &snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME),
            |stream| {
                serialize_into(stream, &dummy_slot_deltas)?;
                Ok(())
            },
        )
        .unwrap();

        // Check archive is correct
        snapshot_utils::verify_snapshot_archive(
            output_tar_path,
            snapshots_dir,
            accounts_dir,
            archive_format,
            snapshot_utils::VerifyBank::Deterministic,
        );
    }
}