1mod snapshot_gossip_manager;
2use {
3 agave_snapshots::{
4 paths as snapshot_paths, snapshot_config::SnapshotConfig,
5 snapshot_hash::StartingSnapshotHashes,
6 },
7 snapshot_gossip_manager::SnapshotGossipManager,
8 solana_accounts_db::accounts_db::AccountStorageEntry,
9 solana_clock::Slot,
10 solana_gossip::cluster_info::ClusterInfo,
11 solana_measure::{meas_dur, measure::Measure, measure_us},
12 solana_perf::thread::renice_this_thread,
13 solana_runtime::{
14 accounts_background_service::PendingSnapshotPackages,
15 snapshot_controller::SnapshotController, snapshot_package::SnapshotPackage, snapshot_utils,
16 },
17 std::{
18 sync::{
19 atomic::{AtomicBool, Ordering},
20 Arc, Mutex,
21 },
22 thread::{self, Builder, JoinHandle},
23 time::{Duration, Instant},
24 },
25};
26
27pub struct SnapshotPackagerService {
28 t_snapshot_packager: JoinHandle<()>,
29}
30
31impl SnapshotPackagerService {
32 pub const NAME: &str = "SnapshotPackagerService";
33
34 const LOOP_LIMITER: Duration = Duration::from_millis(100);
36
37 pub fn new(
38 pending_snapshot_packages: Arc<Mutex<PendingSnapshotPackages>>,
39 starting_snapshot_hashes: Option<StartingSnapshotHashes>,
40 exit: Arc<AtomicBool>,
41 exit_backpressure: Option<Arc<AtomicBool>>,
42 cluster_info: Arc<ClusterInfo>,
43 snapshot_controller: Arc<SnapshotController>,
44 enable_gossip_push: bool,
45 ) -> Self {
46 let t_snapshot_packager = Builder::new()
47 .name("solSnapshotPkgr".to_string())
48 .spawn(move || {
49 if let Some(exit_backpressure) = &exit_backpressure {
50 exit_backpressure.store(true, Ordering::Relaxed);
51 }
52 info!("{} has started", Self::NAME);
53 let snapshot_config = snapshot_controller.snapshot_config();
54 renice_this_thread(snapshot_config.packager_thread_niceness_adj).unwrap();
55 let mut snapshot_gossip_manager = enable_gossip_push
56 .then(|| SnapshotGossipManager::new(cluster_info, starting_snapshot_hashes));
57
58 let mut teardown_state = None;
59 loop {
60 if exit.load(Ordering::Relaxed) {
61 if let Some(teardown_state) = &teardown_state {
62 info!("Received exit request, tearing down...");
63 let (_, dur) = meas_dur!(Self::teardown(
64 teardown_state,
65 snapshot_controller.snapshot_config(),
66 ));
67 info!("Teardown completed in {dur:?}.");
68 }
69 break;
70 }
71
72 let Some(snapshot_package) =
73 Self::get_next_snapshot_package(&pending_snapshot_packages)
74 else {
75 std::thread::sleep(Self::LOOP_LIMITER);
76 continue;
77 };
78 info!("handling snapshot package: {snapshot_package:?}");
79 let enqueued_time = snapshot_package.enqueued.elapsed();
80
81 let measure_handling = Measure::start("");
82 let snapshot_kind = snapshot_package.snapshot_kind;
83 let snapshot_slot = snapshot_package.slot;
84 let snapshot_hash = snapshot_package.hash;
85
86 if exit_backpressure.is_some() {
87 teardown_state = Some(TeardownState {
91 snapshot_slot: snapshot_package.slot,
92 snapshot_storages: snapshot_package.snapshot_storages.clone(),
93 });
94 }
95
96 let (archive_result, archive_time_us) =
100 measure_us!(snapshot_utils::serialize_and_archive_snapshot_package(
101 snapshot_package,
102 snapshot_config,
103 exit_backpressure.is_none(),
106 ));
107 if let Err(err) = archive_result {
108 error!(
109 "Stopping {}! Fatal error while archiving snapshot package: {err}",
110 Self::NAME,
111 );
112 exit.store(true, Ordering::Relaxed);
113 break;
114 }
115
116 if let Some(snapshot_gossip_manager) = snapshot_gossip_manager.as_mut() {
117 snapshot_gossip_manager
118 .push_snapshot_hash(snapshot_kind, (snapshot_slot, snapshot_hash));
119 }
120
121 let (_, purge_archives_time_us) =
122 measure_us!(snapshot_utils::purge_old_snapshot_archives(
123 &snapshot_config.full_snapshot_archives_dir,
124 &snapshot_config.incremental_snapshot_archives_dir,
125 snapshot_config.maximum_full_snapshot_archives_to_retain,
126 snapshot_config.maximum_incremental_snapshot_archives_to_retain,
127 ));
128
129 let (_, purge_bank_snapshots_time_us) =
134 measure_us!(snapshot_utils::purge_bank_snapshots_older_than_slot(
135 &snapshot_config.bank_snapshots_dir,
136 snapshot_slot,
137 ));
138
139 let handling_time_us = measure_handling.end_as_us();
140 datapoint_info!(
141 "snapshot_packager_service",
142 ("enqueued_time_us", enqueued_time.as_micros(), i64),
143 ("handling_time_us", handling_time_us, i64),
144 ("archive_time_us", archive_time_us, i64),
145 (
146 "purge_old_snapshots_time_us",
147 purge_bank_snapshots_time_us,
148 i64
149 ),
150 ("purge_old_archives_time_us", purge_archives_time_us, i64),
151 );
152 }
153 info!("{} has stopped", Self::NAME);
154 if let Some(exit_backpressure) = &exit_backpressure {
155 exit_backpressure.store(false, Ordering::Relaxed);
156 }
157 })
158 .unwrap();
159
160 Self {
161 t_snapshot_packager,
162 }
163 }
164
165 pub fn join(self) -> thread::Result<()> {
166 self.t_snapshot_packager.join()
167 }
168
169 fn get_next_snapshot_package(
171 pending_snapshot_packages: &Mutex<PendingSnapshotPackages>,
172 ) -> Option<SnapshotPackage> {
173 pending_snapshot_packages.lock().unwrap().pop()
174 }
175
176 fn teardown(state: &TeardownState, snapshot_config: &SnapshotConfig) {
178 info!("Flushing account storages...");
179 let start = Instant::now();
180 for storage in &state.snapshot_storages {
181 let result = storage.flush();
182 if let Err(err) = result {
183 warn!(
184 "Failed to flush account storage '{}': {err}",
185 storage.path().display(),
186 );
187 return;
190 }
191 }
192 info!("Flushing account storages... Done in {:?}", start.elapsed());
193
194 let bank_snapshot_dir = snapshot_paths::get_bank_snapshot_dir(
195 &snapshot_config.bank_snapshots_dir,
196 state.snapshot_slot,
197 );
198
199 info!("Hard linking account storages...");
200 let start = Instant::now();
201 let result = snapshot_utils::hard_link_storages_to_snapshot(
202 &bank_snapshot_dir,
203 state.snapshot_slot,
204 &state.snapshot_storages,
205 );
206 if let Err(err) = result {
207 warn!("Failed to hard link account storages: {err}");
208 return;
211 }
212 info!(
213 "Hard linking account storages... Done in {:?}",
214 start.elapsed(),
215 );
216
217 info!("Saving obsolete accounts...");
218 let start = Instant::now();
219 let result = snapshot_utils::write_obsolete_accounts_to_snapshot(
220 &bank_snapshot_dir,
221 &state.snapshot_storages,
222 state.snapshot_slot,
223 );
224 if let Err(err) = result {
225 warn!("Failed to serialize obsolete accounts: {err}");
226 return;
229 }
230 info!("Saving obsolete accounts... Done in {:?}", start.elapsed());
231
232 let result = snapshot_utils::mark_bank_snapshot_as_loadable(&bank_snapshot_dir);
233 if let Err(err) = result {
234 warn!("Failed to mark bank snapshot as loadable: {err}");
235 }
236 }
237}
238
239struct TeardownState {
242 snapshot_slot: Slot,
244 snapshot_storages: Vec<Arc<AccountStorageEntry>>,
246}