solana_runtime/
snapshot_controller.rs

1use {
2    crate::{
3        accounts_background_service::{
4            SnapshotRequest, SnapshotRequestKind, SnapshotRequestSender,
5        },
6        bank::{Bank, SquashTiming},
7        bank_forks::SetRootError,
8        snapshot_config::SnapshotConfig,
9        snapshot_utils::SnapshotInterval,
10    },
11    log::*,
12    solana_clock::Slot,
13    solana_measure::measure::Measure,
14    std::{
15        sync::{
16            atomic::{AtomicU64, Ordering},
17            Arc,
18        },
19        time::Instant,
20    },
21};
22
23struct SnapshotGenerationIntervals {
24    full_snapshot_interval: SnapshotInterval,
25    incremental_snapshot_interval: SnapshotInterval,
26}
27
28pub struct SnapshotController {
29    abs_request_sender: SnapshotRequestSender,
30    snapshot_config: SnapshotConfig,
31    latest_abs_request_slot: AtomicU64,
32}
33
34impl SnapshotController {
35    pub fn new(
36        abs_request_sender: SnapshotRequestSender,
37        snapshot_config: SnapshotConfig,
38        root_slot: Slot,
39    ) -> Self {
40        Self {
41            abs_request_sender,
42            snapshot_config,
43            latest_abs_request_slot: AtomicU64::new(root_slot),
44        }
45    }
46
47    pub fn snapshot_config(&self) -> &SnapshotConfig {
48        &self.snapshot_config
49    }
50
51    pub fn request_sender(&self) -> &SnapshotRequestSender {
52        &self.abs_request_sender
53    }
54
55    fn latest_abs_request_slot(&self) -> Slot {
56        self.latest_abs_request_slot.load(Ordering::Relaxed)
57    }
58
59    fn set_latest_abs_request_slot(&self, slot: Slot) {
60        self.latest_abs_request_slot.store(slot, Ordering::Relaxed);
61    }
62
63    pub fn handle_new_roots(
64        &self,
65        root: Slot,
66        banks: &[&Arc<Bank>],
67    ) -> Result<(bool, SquashTiming, u64), SetRootError> {
68        let mut is_root_bank_squashed = false;
69        let mut squash_timing = SquashTiming::default();
70        let mut total_snapshot_ms = 0;
71
72        if let Some(SnapshotGenerationIntervals {
73            full_snapshot_interval,
74            incremental_snapshot_interval,
75        }) = self.snapshot_generation_intervals()
76        {
77            if let Some((bank, request_kind)) = banks.iter().find_map(|bank| {
78                let should_request_full_snapshot =
79                    if let SnapshotInterval::Slots(snapshot_interval) = full_snapshot_interval {
80                        bank.block_height() % snapshot_interval == 0
81                    } else {
82                        false
83                    };
84                let should_request_incremental_snapshot =
85                    if let SnapshotInterval::Slots(snapshot_interval) =
86                        incremental_snapshot_interval
87                    {
88                        bank.block_height() % snapshot_interval == 0
89                    } else {
90                        false
91                    };
92
93                if bank.slot() <= self.latest_abs_request_slot() {
94                    None
95                } else if should_request_full_snapshot {
96                    Some((bank, SnapshotRequestKind::FullSnapshot))
97                } else if should_request_incremental_snapshot {
98                    Some((bank, SnapshotRequestKind::IncrementalSnapshot))
99                } else {
100                    None
101                }
102            }) {
103                let bank_slot = bank.slot();
104                self.set_latest_abs_request_slot(bank_slot);
105                squash_timing += bank.squash();
106
107                is_root_bank_squashed = bank_slot == root;
108
109                let mut snapshot_time = Measure::start("squash::snapshot_time");
110                if bank.has_initial_accounts_hash_verification_completed() {
111                    // Save off the status cache because these may get pruned if another
112                    // `set_root()` is called before the snapshots package can be generated
113                    let status_cache_slot_deltas =
114                        bank.status_cache.read().unwrap().root_slot_deltas();
115                    if let Err(e) = self.abs_request_sender.send(SnapshotRequest {
116                        snapshot_root_bank: Arc::clone(bank),
117                        status_cache_slot_deltas,
118                        request_kind,
119                        enqueued: Instant::now(),
120                    }) {
121                        warn!("Error sending snapshot request for bank: {bank_slot}, err: {e:?}");
122                    }
123                } else {
124                    info!(
125                        "Not sending snapshot request for bank: {bank_slot}, startup verification \
126                         is incomplete"
127                    );
128                }
129                snapshot_time.stop();
130                total_snapshot_ms += snapshot_time.as_ms();
131            }
132        }
133
134        Ok((is_root_bank_squashed, squash_timing, total_snapshot_ms))
135    }
136
137    /// Returns the intervals, in slots, for sending snapshot requests
138    ///
139    /// Returns None if snapshot generation is disabled and snapshot requests
140    /// should not be sent
141    fn snapshot_generation_intervals(&self) -> Option<SnapshotGenerationIntervals> {
142        self.snapshot_config
143            .should_generate_snapshots()
144            .then_some(SnapshotGenerationIntervals {
145                full_snapshot_interval: self.snapshot_config.full_snapshot_archive_interval,
146                incremental_snapshot_interval: self
147                    .snapshot_config
148                    .incremental_snapshot_archive_interval,
149            })
150    }
151}