solana_runtime/
snapshot_controller.rs1use {
2 crate::{
3 accounts_background_service::{
4 SnapshotRequest, SnapshotRequestKind, SnapshotRequestSender,
5 },
6 bank::{epoch_accounts_hash_utils, Bank, SquashTiming},
7 bank_forks::SetRootError,
8 snapshot_config::SnapshotConfig,
9 },
10 log::*,
11 solana_clock::Slot,
12 solana_measure::measure::Measure,
13 std::{
14 sync::{
15 atomic::{AtomicU64, Ordering},
16 Arc,
17 },
18 time::Instant,
19 },
20};
21
22struct SnapshotGenerationIntervals {
23 full_snapshot_interval: Slot,
24 incremental_snapshot_interval: Slot,
25}
26
27pub struct SnapshotController {
28 abs_request_sender: SnapshotRequestSender,
29 snapshot_config: SnapshotConfig,
30 latest_abs_request_slot: AtomicU64,
31}
32
33impl SnapshotController {
34 pub fn new(
35 abs_request_sender: SnapshotRequestSender,
36 snapshot_config: SnapshotConfig,
37 root_slot: Slot,
38 ) -> Self {
39 Self {
40 abs_request_sender,
41 snapshot_config,
42 latest_abs_request_slot: AtomicU64::new(root_slot),
43 }
44 }
45
46 pub fn snapshot_config(&self) -> &SnapshotConfig {
47 &self.snapshot_config
48 }
49
50 pub fn request_sender(&self) -> &SnapshotRequestSender {
51 &self.abs_request_sender
52 }
53
54 fn latest_abs_request_slot(&self) -> Slot {
55 self.latest_abs_request_slot.load(Ordering::Relaxed)
56 }
57
58 fn set_latest_abs_request_slot(&self, slot: Slot) {
59 self.latest_abs_request_slot.store(slot, Ordering::Relaxed);
60 }
61
62 pub fn handle_new_roots(
63 &self,
64 root: Slot,
65 banks: &[&Arc<Bank>],
66 ) -> Result<(bool, SquashTiming, u64), SetRootError> {
67 let (mut is_root_bank_squashed, mut squash_timing) =
68 self.send_eah_request_if_needed(root, banks)?;
69 let mut total_snapshot_ms = 0;
70
71 if let Some(SnapshotGenerationIntervals {
78 full_snapshot_interval,
79 incremental_snapshot_interval,
80 }) = self.snapshot_generation_intervals()
81 {
82 if let Some((bank, request_kind)) = banks.iter().find_map(|bank| {
83 if bank.slot() <= self.latest_abs_request_slot() {
84 None
85 } else if bank.block_height() % full_snapshot_interval == 0 {
86 Some((bank, SnapshotRequestKind::FullSnapshot))
87 } else if bank.block_height() % incremental_snapshot_interval == 0 {
88 Some((bank, SnapshotRequestKind::IncrementalSnapshot))
89 } else {
90 None
91 }
92 }) {
93 let bank_slot = bank.slot();
94 self.set_latest_abs_request_slot(bank_slot);
95 squash_timing += bank.squash();
96
97 is_root_bank_squashed = bank_slot == root;
98
99 let mut snapshot_time = Measure::start("squash::snapshot_time");
100 if bank.is_startup_verification_complete() {
101 let status_cache_slot_deltas =
104 bank.status_cache.read().unwrap().root_slot_deltas();
105 if let Err(e) = self.abs_request_sender.send(SnapshotRequest {
106 snapshot_root_bank: Arc::clone(bank),
107 status_cache_slot_deltas,
108 request_kind,
109 enqueued: Instant::now(),
110 }) {
111 warn!(
112 "Error sending snapshot request for bank: {}, err: {:?}",
113 bank_slot, e
114 );
115 }
116 } else {
117 info!("Not sending snapshot request for bank: {}, startup verification is incomplete", bank_slot);
118 }
119 snapshot_time.stop();
120 total_snapshot_ms += snapshot_time.as_ms();
121 }
122 }
123
124 Ok((is_root_bank_squashed, squash_timing, total_snapshot_ms))
125 }
126
127 fn snapshot_generation_intervals(&self) -> Option<SnapshotGenerationIntervals> {
132 self.snapshot_config
133 .should_generate_snapshots()
134 .then_some(SnapshotGenerationIntervals {
135 full_snapshot_interval: self.snapshot_config.full_snapshot_archive_interval_slots,
136 incremental_snapshot_interval: self
137 .snapshot_config
138 .incremental_snapshot_archive_interval_slots,
139 })
140 }
141
142 pub fn send_eah_request_if_needed(
147 &self,
148 root: Slot,
149 banks: &[&Arc<Bank>],
150 ) -> Result<(bool, SquashTiming), SetRootError> {
151 let mut is_root_bank_squashed = false;
152 let mut squash_timing = SquashTiming::default();
153
154 let eah_banks: Vec<_> = banks
160 .iter()
161 .filter(|bank| self.should_request_epoch_accounts_hash(bank))
162 .collect();
163 assert!(
164 eah_banks.len() <= 1,
165 "At most one bank should request an epoch accounts hash calculation! num banks: {}, bank slots: {:?}",
166 eah_banks.len(),
167 eah_banks.iter().map(|bank| bank.slot()).collect::<Vec<_>>(),
168 );
169 if let Some(&&eah_bank) = eah_banks.first() {
170 debug!(
171 "sending epoch accounts hash request, slot: {}",
172 eah_bank.slot(),
173 );
174
175 self.set_latest_abs_request_slot(eah_bank.slot());
176 squash_timing += eah_bank.squash();
177 is_root_bank_squashed = eah_bank.slot() == root;
178
179 eah_bank
180 .rc
181 .accounts
182 .accounts_db
183 .epoch_accounts_hash_manager
184 .set_in_flight(eah_bank.slot());
185
186 if let Err(err) = self.abs_request_sender.send(SnapshotRequest {
187 snapshot_root_bank: Arc::clone(eah_bank),
188 status_cache_slot_deltas: Vec::default(),
189 request_kind: SnapshotRequestKind::EpochAccountsHash,
190 enqueued: Instant::now(),
191 }) {
192 return Err(SetRootError::SendEpochAccountHashError(
193 eah_bank.slot(),
194 err,
195 ));
196 }
197 }
198
199 Ok((is_root_bank_squashed, squash_timing))
200 }
201
202 #[must_use]
204 fn should_request_epoch_accounts_hash(&self, bank: &Bank) -> bool {
205 if !epoch_accounts_hash_utils::is_enabled_this_epoch(bank) {
206 return false;
207 }
208
209 let start_slot = epoch_accounts_hash_utils::calculation_start(bank);
210 bank.slot() > self.latest_abs_request_slot()
211 && bank.parent_slot() < start_slot
212 && bank.slot() >= start_slot
213 }
214}