solana_runtime/
snapshot_controller.rs

1use {
2    crate::{
3        accounts_background_service::{
4            SnapshotRequest, SnapshotRequestKind, SnapshotRequestSender,
5        },
6        bank::{epoch_accounts_hash_utils, Bank, SquashTiming},
7        bank_forks::SetRootError,
8        snapshot_config::SnapshotConfig,
9    },
10    log::*,
11    solana_clock::Slot,
12    solana_measure::measure::Measure,
13    std::{
14        sync::{
15            atomic::{AtomicU64, Ordering},
16            Arc,
17        },
18        time::Instant,
19    },
20};
21
22struct SnapshotGenerationIntervals {
23    full_snapshot_interval: Slot,
24    incremental_snapshot_interval: Slot,
25}
26
27pub struct SnapshotController {
28    abs_request_sender: SnapshotRequestSender,
29    snapshot_config: SnapshotConfig,
30    latest_abs_request_slot: AtomicU64,
31}
32
33impl SnapshotController {
34    pub fn new(
35        abs_request_sender: SnapshotRequestSender,
36        snapshot_config: SnapshotConfig,
37        root_slot: Slot,
38    ) -> Self {
39        Self {
40            abs_request_sender,
41            snapshot_config,
42            latest_abs_request_slot: AtomicU64::new(root_slot),
43        }
44    }
45
46    pub fn snapshot_config(&self) -> &SnapshotConfig {
47        &self.snapshot_config
48    }
49
50    pub fn request_sender(&self) -> &SnapshotRequestSender {
51        &self.abs_request_sender
52    }
53
54    fn latest_abs_request_slot(&self) -> Slot {
55        self.latest_abs_request_slot.load(Ordering::Relaxed)
56    }
57
58    fn set_latest_abs_request_slot(&self, slot: Slot) {
59        self.latest_abs_request_slot.store(slot, Ordering::Relaxed);
60    }
61
62    pub fn handle_new_roots(
63        &self,
64        root: Slot,
65        banks: &[&Arc<Bank>],
66    ) -> Result<(bool, SquashTiming, u64), SetRootError> {
67        let (mut is_root_bank_squashed, mut squash_timing) =
68            self.send_eah_request_if_needed(root, banks)?;
69        let mut total_snapshot_ms = 0;
70
71        // After checking for EAH requests, also check for regular snapshot requests.
72        //
73        // This is needed when a snapshot request occurs in a slot after an EAH request, and is
74        // part of the same set of `banks` in a single `set_root()` invocation.  While (very)
75        // unlikely for a validator with default snapshot intervals (and accounts hash verifier
76        // intervals), it *is* possible, and there are tests to exercise this possibility.
77        if let Some(SnapshotGenerationIntervals {
78            full_snapshot_interval,
79            incremental_snapshot_interval,
80        }) = self.snapshot_generation_intervals()
81        {
82            if let Some((bank, request_kind)) = banks.iter().find_map(|bank| {
83                if bank.slot() <= self.latest_abs_request_slot() {
84                    None
85                } else if bank.block_height() % full_snapshot_interval == 0 {
86                    Some((bank, SnapshotRequestKind::FullSnapshot))
87                } else if bank.block_height() % incremental_snapshot_interval == 0 {
88                    Some((bank, SnapshotRequestKind::IncrementalSnapshot))
89                } else {
90                    None
91                }
92            }) {
93                let bank_slot = bank.slot();
94                self.set_latest_abs_request_slot(bank_slot);
95                squash_timing += bank.squash();
96
97                is_root_bank_squashed = bank_slot == root;
98
99                let mut snapshot_time = Measure::start("squash::snapshot_time");
100                if bank.is_startup_verification_complete() {
101                    // Save off the status cache because these may get pruned if another
102                    // `set_root()` is called before the snapshots package can be generated
103                    let status_cache_slot_deltas =
104                        bank.status_cache.read().unwrap().root_slot_deltas();
105                    if let Err(e) = self.abs_request_sender.send(SnapshotRequest {
106                        snapshot_root_bank: Arc::clone(bank),
107                        status_cache_slot_deltas,
108                        request_kind,
109                        enqueued: Instant::now(),
110                    }) {
111                        warn!(
112                            "Error sending snapshot request for bank: {}, err: {:?}",
113                            bank_slot, e
114                        );
115                    }
116                } else {
117                    info!("Not sending snapshot request for bank: {}, startup verification is incomplete", bank_slot);
118                }
119                snapshot_time.stop();
120                total_snapshot_ms += snapshot_time.as_ms();
121            }
122        }
123
124        Ok((is_root_bank_squashed, squash_timing, total_snapshot_ms))
125    }
126
127    /// Returns the intervals, in slots, for sending snapshot requests
128    ///
129    /// Returns None if snapshot generation is disabled and snapshot requests
130    /// should not be sent
131    fn snapshot_generation_intervals(&self) -> Option<SnapshotGenerationIntervals> {
132        self.snapshot_config
133            .should_generate_snapshots()
134            .then_some(SnapshotGenerationIntervals {
135                full_snapshot_interval: self.snapshot_config.full_snapshot_archive_interval_slots,
136                incremental_snapshot_interval: self
137                    .snapshot_config
138                    .incremental_snapshot_archive_interval_slots,
139            })
140    }
141
142    /// Sends an EpochAccountsHash request if one of the `banks` crosses the EAH boundary.
143    /// Returns if the bank at slot `root` was squashed, and its timings.
144    ///
145    /// Panics if more than one bank in `banks` should send an EAH request.
146    pub fn send_eah_request_if_needed(
147        &self,
148        root: Slot,
149        banks: &[&Arc<Bank>],
150    ) -> Result<(bool, SquashTiming), SetRootError> {
151        let mut is_root_bank_squashed = false;
152        let mut squash_timing = SquashTiming::default();
153
154        // Go through all the banks and see if we should send an EAH request.
155        // Only one EAH bank is allowed to send an EAH request.
156        // NOTE: Instead of filter-collect-assert, `.find()` could be used instead.
157        // Once sufficient testing guarantees only one bank will ever request an EAH,
158        // change to `.find()`.
159        let eah_banks: Vec<_> = banks
160            .iter()
161            .filter(|bank| self.should_request_epoch_accounts_hash(bank))
162            .collect();
163        assert!(
164            eah_banks.len() <= 1,
165            "At most one bank should request an epoch accounts hash calculation! num banks: {}, bank slots: {:?}",
166            eah_banks.len(),
167            eah_banks.iter().map(|bank| bank.slot()).collect::<Vec<_>>(),
168        );
169        if let Some(&&eah_bank) = eah_banks.first() {
170            debug!(
171                "sending epoch accounts hash request, slot: {}",
172                eah_bank.slot(),
173            );
174
175            self.set_latest_abs_request_slot(eah_bank.slot());
176            squash_timing += eah_bank.squash();
177            is_root_bank_squashed = eah_bank.slot() == root;
178
179            eah_bank
180                .rc
181                .accounts
182                .accounts_db
183                .epoch_accounts_hash_manager
184                .set_in_flight(eah_bank.slot());
185
186            if let Err(err) = self.abs_request_sender.send(SnapshotRequest {
187                snapshot_root_bank: Arc::clone(eah_bank),
188                status_cache_slot_deltas: Vec::default(),
189                request_kind: SnapshotRequestKind::EpochAccountsHash,
190                enqueued: Instant::now(),
191            }) {
192                return Err(SetRootError::SendEpochAccountHashError(
193                    eah_bank.slot(),
194                    err,
195                ));
196            }
197        }
198
199        Ok((is_root_bank_squashed, squash_timing))
200    }
201
202    /// Determine if this bank should request an epoch accounts hash
203    #[must_use]
204    fn should_request_epoch_accounts_hash(&self, bank: &Bank) -> bool {
205        if !epoch_accounts_hash_utils::is_enabled_this_epoch(bank) {
206            return false;
207        }
208
209        let start_slot = epoch_accounts_hash_utils::calculation_start(bank);
210        bank.slot() > self.latest_abs_request_slot()
211            && bank.parent_slot() < start_slot
212            && bank.slot() >= start_slot
213    }
214}