solana-core 4.0.0-beta.6

Blockchain, Rebuilt for Scale
Documentation
mod snapshot_gossip_manager;
use {
    agave_snapshots::{
        SnapshotKind, paths as snapshot_paths, snapshot_config::SnapshotConfig,
        snapshot_hash::StartingSnapshotHashes,
    },
    snapshot_gossip_manager::SnapshotGossipManager,
    solana_accounts_db::account_storage_entry::AccountStorageEntry,
    solana_clock::Slot,
    solana_gossip::cluster_info::ClusterInfo,
    solana_measure::{meas_dur, measure::Measure, measure_us},
    solana_perf::thread::renice_this_thread,
    solana_runtime::{
        accounts_background_service::PendingSnapshotPackages,
        snapshot_controller::SnapshotController,
        snapshot_package::{BankSnapshotPackage, SnapshotPackage},
        snapshot_utils,
    },
    std::{
        sync::{
            Arc, Mutex,
            atomic::{AtomicBool, Ordering},
        },
        thread::{self, Builder, JoinHandle},
        time::{Duration, Instant},
    },
};

pub struct SnapshotPackagerService {
    t_snapshot_packager: JoinHandle<()>,
}

impl SnapshotPackagerService {
    pub const NAME: &str = "SnapshotPackagerService";

    /// If there are no snapshot packages to handle, limit how often we re-check
    const LOOP_LIMITER: Duration = Duration::from_millis(100);

    pub fn new(
        pending_snapshot_packages: Arc<Mutex<PendingSnapshotPackages>>,
        starting_snapshot_hashes: Option<StartingSnapshotHashes>,
        exit: Arc<AtomicBool>,
        exit_backpressure: Option<Arc<AtomicBool>>,
        cluster_info: Arc<ClusterInfo>,
        snapshot_controller: Arc<SnapshotController>,
        enable_gossip_push: bool,
        niceness_adj: i8,
    ) -> Self {
        let t_snapshot_packager = Builder::new()
            .name("solSnapshotPkgr".to_string())
            .spawn(move || {
                if let Some(exit_backpressure) = &exit_backpressure {
                    exit_backpressure.store(true, Ordering::Relaxed);
                }
                info!("{} has started", Self::NAME);
                let snapshot_config = snapshot_controller.snapshot_config();
                renice_this_thread(niceness_adj).unwrap();
                let mut snapshot_gossip_manager = enable_gossip_push
                    .then(|| SnapshotGossipManager::new(cluster_info, starting_snapshot_hashes));

                let mut teardown_state = None;
                loop {
                    if exit.load(Ordering::Relaxed) {
                        if let Some(teardown_state) = teardown_state {
                            info!("Received exit request, tearing down...");
                            let (_, dur) = meas_dur!(Self::teardown(
                                teardown_state,
                                snapshot_controller.snapshot_config(),
                            ));
                            info!("Teardown completed in {dur:?}.");
                        }
                        break;
                    }

                    let Some(snapshot_package) =
                        Self::get_next_snapshot_package(&pending_snapshot_packages)
                    else {
                        std::thread::sleep(Self::LOOP_LIMITER);
                        continue;
                    };
                    info!("handling snapshot package: {snapshot_package:?}");
                    let enqueued_time = snapshot_package.enqueued.elapsed();

                    let measure_handling = Measure::start("");
                    let snapshot_kind = snapshot_package.snapshot_kind;
                    let snapshot_slot = snapshot_package.slot;
                    let snapshot_hash = snapshot_package.hash;

                    snapshot_controller.set_latest_bank_snapshot_slot(snapshot_slot);

                    if exit_backpressure.is_some() {
                        // With exit backpressure, we will delay flushing snapshot storages
                        // until we receive a graceful exit request.
                        // Save the snapshot storages here, so we can flush later (as needed).
                        // For fastboot snapshot packages, the bank snapshot is saved and
                        // the rest of the snapshotting process is skipped
                        if snapshot_kind == SnapshotKind::Fastboot {
                            teardown_state = Some(TeardownState {
                                snapshot_slot: snapshot_package.slot,
                                snapshot_storages: snapshot_package.snapshot_storages.clone(),
                                bank_snapshot_package: Some(snapshot_package.bank_snapshot_package),
                            });

                            let handling_time = measure_handling.end_as_us();
                            datapoint_info!(
                                "snapshot_packager_service",
                                ("enqueued_time_us", enqueued_time.as_micros(), i64),
                                ("handling_time_us", handling_time, i64),
                            );

                            continue;
                        } else {
                            teardown_state = Some(TeardownState {
                                snapshot_slot: snapshot_package.slot,
                                snapshot_storages: snapshot_package.snapshot_storages.clone(),
                                bank_snapshot_package: None,
                            });
                        }
                    }

                    let archive_time = Instant::now();
                    // Serializing the snapshot package is not allowed to fail, as archiving is
                    // not allowed to fail (see comment on archive_snapshot_package below
                    let bank_snapshot_info = snapshot_utils::serialize_snapshot(
                        &snapshot_config.bank_snapshots_dir,
                        snapshot_config.snapshot_version,
                        snapshot_package.bank_snapshot_package,
                        snapshot_package.snapshot_storages.as_slice(),
                        exit_backpressure.is_none(),
                    );

                    let Ok(bank_snapshot_info) = bank_snapshot_info else {
                        let err = bank_snapshot_info.unwrap_err();
                        error!(
                            "Stopping {}! Fatal error while serializing snapshot for slot \
                             {snapshot_slot}: {err}",
                            Self::NAME,
                        );
                        exit.store(true, Ordering::Relaxed);
                        break;
                    };

                    if let SnapshotKind::Archive(snapshot_archive_kind) = snapshot_kind {
                        // Archiving the snapshot package is not allowed to fail.
                        // AccountsBackgroundService calls `clean_accounts()` with a value for
                        // latest_full_snapshot_slot that requires this archive call to succeed.
                        if let Err(err) = snapshot_utils::archive_snapshot_package(
                            snapshot_archive_kind,
                            snapshot_slot,
                            snapshot_hash,
                            &bank_snapshot_info.snapshot_dir,
                            snapshot_package.snapshot_storages,
                            snapshot_config,
                        ) {
                            error!(
                                "Stopping {}! Fatal error while archiving snapshot package: {err}",
                                Self::NAME,
                            );
                            exit.store(true, Ordering::Relaxed);
                            break;
                        }
                    }
                    let archive_time_us = archive_time.elapsed().as_micros();

                    if let Some(snapshot_gossip_manager) = snapshot_gossip_manager.as_mut() {
                        snapshot_gossip_manager
                            .push_snapshot_hash(snapshot_kind, (snapshot_slot, snapshot_hash));
                    }

                    let (_, purge_archives_time_us) =
                        measure_us!(snapshot_utils::purge_old_snapshot_archives(
                            &snapshot_config.full_snapshot_archives_dir,
                            &snapshot_config.incremental_snapshot_archives_dir,
                            snapshot_config.maximum_full_snapshot_archives_to_retain,
                            snapshot_config.maximum_incremental_snapshot_archives_to_retain,
                        ));

                    // Now that this snapshot package has been archived, it is safe to remove
                    // all bank snapshots older than this slot.  We want to keep the bank
                    // snapshot *at this slot* so that it can be used during restarts, when
                    // booting from local state.
                    let (_, purge_bank_snapshots_time_us) =
                        measure_us!(snapshot_utils::purge_bank_snapshots_older_than_slot(
                            &snapshot_config.bank_snapshots_dir,
                            snapshot_slot,
                        ));

                    let handling_time_us = measure_handling.end_as_us();
                    datapoint_info!(
                        "snapshot_packager_service",
                        ("enqueued_time_us", enqueued_time.as_micros(), i64),
                        ("handling_time_us", handling_time_us, i64),
                        ("archive_time_us", archive_time_us, i64),
                        (
                            "purge_old_snapshots_time_us",
                            purge_bank_snapshots_time_us,
                            i64
                        ),
                        ("purge_old_archives_time_us", purge_archives_time_us, i64),
                    );
                }
                info!("{} has stopped", Self::NAME);
                if let Some(exit_backpressure) = &exit_backpressure {
                    exit_backpressure.store(false, Ordering::Relaxed);
                }
            })
            .unwrap();

        Self {
            t_snapshot_packager,
        }
    }

    pub fn join(self) -> thread::Result<()> {
        self.t_snapshot_packager.join()
    }

    /// Returns the next snapshot package to handle
    fn get_next_snapshot_package(
        pending_snapshot_packages: &Mutex<PendingSnapshotPackages>,
    ) -> Option<SnapshotPackage> {
        pending_snapshot_packages.lock().unwrap().pop()
    }

    /// Performs final operations before gracefully shutting down
    fn teardown(state: TeardownState, snapshot_config: &SnapshotConfig) {
        let TeardownState {
            snapshot_slot,
            snapshot_storages,
            bank_snapshot_package,
        } = state;

        if let Some(bank_snapshot_package) = bank_snapshot_package {
            info!("Serializing bank snapshot...");
            let start = Instant::now();
            let result = snapshot_utils::serialize_snapshot(
                &snapshot_config.bank_snapshots_dir,
                snapshot_config.snapshot_version,
                bank_snapshot_package,
                snapshot_storages.as_slice(),
                false,
            );
            if let Err(err) = result {
                warn!(
                    "Failed to serialize bank '{}': {err}",
                    snapshot_config.bank_snapshots_dir.as_path().display(),
                );
                // If serializing the bank fails, we do *NOT* want to write
                // the mark_bank_snapshot_as_loadable file, so return early.
                return;
            }
            info!("Serializing bank snapshot... Done in {:?}", start.elapsed());
        }

        info!("Flushing account storages...");
        let start = Instant::now();
        for storage in &snapshot_storages {
            let result = storage.flush();
            if let Err(err) = result {
                warn!(
                    "Failed to flush account storage '{}': {err}",
                    storage.path().display(),
                );
                // If flushing a storage failed, we do *NOT* want to write
                // the "storages flushed" file, so return early.
                return;
            }
        }
        info!("Flushing account storages... Done in {:?}", start.elapsed());

        let bank_snapshot_dir = snapshot_paths::get_bank_snapshot_dir(
            &snapshot_config.bank_snapshots_dir,
            snapshot_slot,
        );

        info!("Hard linking account storages...");
        let start = Instant::now();
        let result = snapshot_utils::hard_link_storages_to_snapshot(
            &bank_snapshot_dir,
            snapshot_slot,
            &snapshot_storages,
        );
        if let Err(err) = result {
            warn!("Failed to hard link account storages: {err}");
            // If hard linking the storages failed, we do *NOT* want to mark the bank snapshot as
            // loadable so return early.
            return;
        }
        info!(
            "Hard linking account storages... Done in {:?}",
            start.elapsed(),
        );

        info!("Saving obsolete accounts...");
        let start = Instant::now();
        let result = snapshot_utils::write_obsolete_accounts_to_snapshot(
            &bank_snapshot_dir,
            &snapshot_storages,
            snapshot_slot,
        );
        if let Err(err) = result {
            warn!("Failed to serialize obsolete accounts: {err}");
            // If serializing the obsolete accounts failed, we do *NOT* want to mark the bank snapshot
            // as loadable so return early.
            return;
        }
        info!("Saving obsolete accounts... Done in {:?}", start.elapsed());

        let result = snapshot_utils::mark_bank_snapshot_as_loadable(&bank_snapshot_dir);
        if let Err(err) = result {
            warn!("Failed to mark bank snapshot as loadable: {err}");
        }
    }
}

/// The state required to run `teardown()`
// Note, don't derive Debug, because we don't want to print out 432k+ `AccountStorageEntry`s...
struct TeardownState {
    /// The slot of the latest snapshot
    snapshot_slot: Slot,
    /// The storages of the latest snapshot
    snapshot_storages: Vec<Arc<AccountStorageEntry>>,
    /// For fastboot snapshots archiving is not required so serialization of the bank snapshot
    /// can be deferred until teardown. In this case `bank_snapshot_package` will be `Some` and
    /// during teardown the bank snapshot will be serialized to storage. For other snapshot types
    /// `bank_snapshot_package` will be `None` because the serialization would have already occurred
    /// when the snapshot archive was written.
    bank_snapshot_package: Option<BankSnapshotPackage>,
}