solana_core/
snapshot_packager_service.rs

1mod snapshot_gossip_manager;
2use {
3    agave_snapshots::{
4        paths as snapshot_paths, snapshot_config::SnapshotConfig,
5        snapshot_hash::StartingSnapshotHashes,
6    },
7    snapshot_gossip_manager::SnapshotGossipManager,
8    solana_accounts_db::accounts_db::AccountStorageEntry,
9    solana_clock::Slot,
10    solana_gossip::cluster_info::ClusterInfo,
11    solana_measure::{meas_dur, measure::Measure, measure_us},
12    solana_perf::thread::renice_this_thread,
13    solana_runtime::{
14        accounts_background_service::PendingSnapshotPackages,
15        snapshot_controller::SnapshotController, snapshot_package::SnapshotPackage, snapshot_utils,
16    },
17    std::{
18        sync::{
19            atomic::{AtomicBool, Ordering},
20            Arc, Mutex,
21        },
22        thread::{self, Builder, JoinHandle},
23        time::{Duration, Instant},
24    },
25};
26
27pub struct SnapshotPackagerService {
28    t_snapshot_packager: JoinHandle<()>,
29}
30
31impl SnapshotPackagerService {
32    pub const NAME: &str = "SnapshotPackagerService";
33
34    /// If there are no snapshot packages to handle, limit how often we re-check
35    const LOOP_LIMITER: Duration = Duration::from_millis(100);
36
37    pub fn new(
38        pending_snapshot_packages: Arc<Mutex<PendingSnapshotPackages>>,
39        starting_snapshot_hashes: Option<StartingSnapshotHashes>,
40        exit: Arc<AtomicBool>,
41        exit_backpressure: Option<Arc<AtomicBool>>,
42        cluster_info: Arc<ClusterInfo>,
43        snapshot_controller: Arc<SnapshotController>,
44        enable_gossip_push: bool,
45    ) -> Self {
46        let t_snapshot_packager = Builder::new()
47            .name("solSnapshotPkgr".to_string())
48            .spawn(move || {
49                if let Some(exit_backpressure) = &exit_backpressure {
50                    exit_backpressure.store(true, Ordering::Relaxed);
51                }
52                info!("{} has started", Self::NAME);
53                let snapshot_config = snapshot_controller.snapshot_config();
54                renice_this_thread(snapshot_config.packager_thread_niceness_adj).unwrap();
55                let mut snapshot_gossip_manager = enable_gossip_push
56                    .then(|| SnapshotGossipManager::new(cluster_info, starting_snapshot_hashes));
57
58                let mut teardown_state = None;
59                loop {
60                    if exit.load(Ordering::Relaxed) {
61                        if let Some(teardown_state) = &teardown_state {
62                            info!("Received exit request, tearing down...");
63                            let (_, dur) = meas_dur!(Self::teardown(
64                                teardown_state,
65                                snapshot_controller.snapshot_config(),
66                            ));
67                            info!("Teardown completed in {dur:?}.");
68                        }
69                        break;
70                    }
71
72                    let Some(snapshot_package) =
73                        Self::get_next_snapshot_package(&pending_snapshot_packages)
74                    else {
75                        std::thread::sleep(Self::LOOP_LIMITER);
76                        continue;
77                    };
78                    info!("handling snapshot package: {snapshot_package:?}");
79                    let enqueued_time = snapshot_package.enqueued.elapsed();
80
81                    let measure_handling = Measure::start("");
82                    let snapshot_kind = snapshot_package.snapshot_kind;
83                    let snapshot_slot = snapshot_package.slot;
84                    let snapshot_hash = snapshot_package.hash;
85
86                    if exit_backpressure.is_some() {
87                        // With exit backpressure, we will delay flushing snapshot storages
88                        // until we receive a graceful exit request.
89                        // Save the snapshot storages here, so we can flush later (as needed).
90                        teardown_state = Some(TeardownState {
91                            snapshot_slot: snapshot_package.slot,
92                            snapshot_storages: snapshot_package.snapshot_storages.clone(),
93                        });
94                    }
95
96                    // Archiving the snapshot package is not allowed to fail.
97                    // AccountsBackgroundService calls `clean_accounts()` with a value for
98                    // latest_full_snapshot_slot that requires this archive call to succeed.
99                    let (archive_result, archive_time_us) =
100                        measure_us!(snapshot_utils::serialize_and_archive_snapshot_package(
101                            snapshot_package,
102                            snapshot_config,
103                            // Without exit backpressure, always flush the snapshot storages,
104                            // which is required for fastboot.
105                            exit_backpressure.is_none(),
106                        ));
107                    if let Err(err) = archive_result {
108                        error!(
109                            "Stopping {}! Fatal error while archiving snapshot package: {err}",
110                            Self::NAME,
111                        );
112                        exit.store(true, Ordering::Relaxed);
113                        break;
114                    }
115
116                    if let Some(snapshot_gossip_manager) = snapshot_gossip_manager.as_mut() {
117                        snapshot_gossip_manager
118                            .push_snapshot_hash(snapshot_kind, (snapshot_slot, snapshot_hash));
119                    }
120
121                    let (_, purge_archives_time_us) =
122                        measure_us!(snapshot_utils::purge_old_snapshot_archives(
123                            &snapshot_config.full_snapshot_archives_dir,
124                            &snapshot_config.incremental_snapshot_archives_dir,
125                            snapshot_config.maximum_full_snapshot_archives_to_retain,
126                            snapshot_config.maximum_incremental_snapshot_archives_to_retain,
127                        ));
128
129                    // Now that this snapshot package has been archived, it is safe to remove
130                    // all bank snapshots older than this slot.  We want to keep the bank
131                    // snapshot *at this slot* so that it can be used during restarts, when
132                    // booting from local state.
133                    let (_, purge_bank_snapshots_time_us) =
134                        measure_us!(snapshot_utils::purge_bank_snapshots_older_than_slot(
135                            &snapshot_config.bank_snapshots_dir,
136                            snapshot_slot,
137                        ));
138
139                    let handling_time_us = measure_handling.end_as_us();
140                    datapoint_info!(
141                        "snapshot_packager_service",
142                        ("enqueued_time_us", enqueued_time.as_micros(), i64),
143                        ("handling_time_us", handling_time_us, i64),
144                        ("archive_time_us", archive_time_us, i64),
145                        (
146                            "purge_old_snapshots_time_us",
147                            purge_bank_snapshots_time_us,
148                            i64
149                        ),
150                        ("purge_old_archives_time_us", purge_archives_time_us, i64),
151                    );
152                }
153                info!("{} has stopped", Self::NAME);
154                if let Some(exit_backpressure) = &exit_backpressure {
155                    exit_backpressure.store(false, Ordering::Relaxed);
156                }
157            })
158            .unwrap();
159
160        Self {
161            t_snapshot_packager,
162        }
163    }
164
165    pub fn join(self) -> thread::Result<()> {
166        self.t_snapshot_packager.join()
167    }
168
169    /// Returns the next snapshot package to handle
170    fn get_next_snapshot_package(
171        pending_snapshot_packages: &Mutex<PendingSnapshotPackages>,
172    ) -> Option<SnapshotPackage> {
173        pending_snapshot_packages.lock().unwrap().pop()
174    }
175
176    /// Performs final operations before gracefully shutting down
177    fn teardown(state: &TeardownState, snapshot_config: &SnapshotConfig) {
178        info!("Flushing account storages...");
179        let start = Instant::now();
180        for storage in &state.snapshot_storages {
181            let result = storage.flush();
182            if let Err(err) = result {
183                warn!(
184                    "Failed to flush account storage '{}': {err}",
185                    storage.path().display(),
186                );
187                // If flushing a storage failed, we do *NOT* want to write
188                // the "storages flushed" file, so return early.
189                return;
190            }
191        }
192        info!("Flushing account storages... Done in {:?}", start.elapsed());
193
194        let bank_snapshot_dir = snapshot_paths::get_bank_snapshot_dir(
195            &snapshot_config.bank_snapshots_dir,
196            state.snapshot_slot,
197        );
198
199        info!("Hard linking account storages...");
200        let start = Instant::now();
201        let result = snapshot_utils::hard_link_storages_to_snapshot(
202            &bank_snapshot_dir,
203            state.snapshot_slot,
204            &state.snapshot_storages,
205        );
206        if let Err(err) = result {
207            warn!("Failed to hard link account storages: {err}");
208            // If hard linking the storages failed, we do *NOT* want to mark the bank snapshot as
209            // loadable so return early.
210            return;
211        }
212        info!(
213            "Hard linking account storages... Done in {:?}",
214            start.elapsed(),
215        );
216
217        info!("Saving obsolete accounts...");
218        let start = Instant::now();
219        let result = snapshot_utils::write_obsolete_accounts_to_snapshot(
220            &bank_snapshot_dir,
221            &state.snapshot_storages,
222            state.snapshot_slot,
223        );
224        if let Err(err) = result {
225            warn!("Failed to serialize obsolete accounts: {err}");
226            // If serializing the obsolete accounts failed, we do *NOT* want to mark the bank snapshot
227            // as loadable so return early.
228            return;
229        }
230        info!("Saving obsolete accounts... Done in {:?}", start.elapsed());
231
232        let result = snapshot_utils::mark_bank_snapshot_as_loadable(&bank_snapshot_dir);
233        if let Err(err) = result {
234            warn!("Failed to mark bank snapshot as loadable: {err}");
235        }
236    }
237}
238
239/// The state required to run `teardown()`
240// Note, don't derive Debug, because we don't want to print out 432k+ `AccountStorageEntry`s...
241struct TeardownState {
242    /// The slot of the latest snapshot
243    snapshot_slot: Slot,
244    /// The storages of the latest snapshot
245    snapshot_storages: Vec<Arc<AccountStorageEntry>>,
246}