solana_runtime/
snapshot_controller.rs

1use {
2    crate::{
3        accounts_background_service::{
4            SnapshotRequest, SnapshotRequestKind, SnapshotRequestSender,
5        },
6        bank::{Bank, SquashTiming},
7    },
8    agave_snapshots::{snapshot_config::SnapshotConfig, SnapshotInterval},
9    log::*,
10    solana_clock::Slot,
11    solana_measure::measure::Measure,
12    std::{
13        sync::{
14            atomic::{AtomicU64, Ordering},
15            Arc,
16        },
17        time::Instant,
18    },
19};
20
21struct SnapshotGenerationIntervals {
22    full_snapshot_interval: SnapshotInterval,
23    incremental_snapshot_interval: SnapshotInterval,
24}
25
26pub struct SnapshotController {
27    abs_request_sender: SnapshotRequestSender,
28    snapshot_config: SnapshotConfig,
29    latest_abs_request_slot: AtomicU64,
30}
31
32impl SnapshotController {
33    pub fn new(
34        abs_request_sender: SnapshotRequestSender,
35        snapshot_config: SnapshotConfig,
36        root_slot: Slot,
37    ) -> Self {
38        Self {
39            abs_request_sender,
40            snapshot_config,
41            latest_abs_request_slot: AtomicU64::new(root_slot),
42        }
43    }
44
45    pub fn snapshot_config(&self) -> &SnapshotConfig {
46        &self.snapshot_config
47    }
48
49    pub fn request_sender(&self) -> &SnapshotRequestSender {
50        &self.abs_request_sender
51    }
52
53    fn latest_abs_request_slot(&self) -> Slot {
54        self.latest_abs_request_slot.load(Ordering::Relaxed)
55    }
56
57    fn set_latest_abs_request_slot(&self, slot: Slot) {
58        self.latest_abs_request_slot.store(slot, Ordering::Relaxed);
59    }
60
61    pub fn handle_new_roots(&self, root: Slot, banks: &[&Arc<Bank>]) -> (bool, SquashTiming, u64) {
62        let mut is_root_bank_squashed = false;
63        let mut squash_timing = SquashTiming::default();
64        let mut total_snapshot_ms = 0;
65
66        if let Some(SnapshotGenerationIntervals {
67            full_snapshot_interval,
68            incremental_snapshot_interval,
69        }) = self.snapshot_generation_intervals()
70        {
71            if let Some((bank, request_kind)) = banks.iter().find_map(|bank| {
72                let should_request_full_snapshot =
73                    if let SnapshotInterval::Slots(snapshot_interval) = full_snapshot_interval {
74                        bank.block_height() % snapshot_interval == 0
75                    } else {
76                        false
77                    };
78                let should_request_incremental_snapshot =
79                    if let SnapshotInterval::Slots(snapshot_interval) =
80                        incremental_snapshot_interval
81                    {
82                        bank.block_height() % snapshot_interval == 0
83                    } else {
84                        false
85                    };
86
87                if bank.slot() <= self.latest_abs_request_slot() {
88                    None
89                } else if should_request_full_snapshot {
90                    Some((bank, SnapshotRequestKind::FullSnapshot))
91                } else if should_request_incremental_snapshot {
92                    Some((bank, SnapshotRequestKind::IncrementalSnapshot))
93                } else {
94                    None
95                }
96            }) {
97                let bank_slot = bank.slot();
98                self.set_latest_abs_request_slot(bank_slot);
99                squash_timing += bank.squash();
100
101                is_root_bank_squashed = bank_slot == root;
102
103                let mut snapshot_time = Measure::start("squash::snapshot_time");
104                // Save off the status cache because these may get pruned if another
105                // `set_root()` is called before the snapshots package can be generated
106                let status_cache_slot_deltas = bank.status_cache.read().unwrap().root_slot_deltas();
107                if let Err(e) = self.abs_request_sender.send(SnapshotRequest {
108                    snapshot_root_bank: Arc::clone(bank),
109                    status_cache_slot_deltas,
110                    request_kind,
111                    enqueued: Instant::now(),
112                }) {
113                    warn!("Error sending snapshot request for bank: {bank_slot}, err: {e:?}");
114                }
115                snapshot_time.stop();
116                total_snapshot_ms += snapshot_time.as_ms();
117            }
118        }
119
120        (is_root_bank_squashed, squash_timing, total_snapshot_ms)
121    }
122
123    /// Returns the intervals, in slots, for sending snapshot requests
124    ///
125    /// Returns None if snapshot generation is disabled and snapshot requests
126    /// should not be sent
127    fn snapshot_generation_intervals(&self) -> Option<SnapshotGenerationIntervals> {
128        self.snapshot_config
129            .should_generate_snapshots()
130            .then_some(SnapshotGenerationIntervals {
131                full_snapshot_interval: self.snapshot_config.full_snapshot_archive_interval,
132                incremental_snapshot_interval: self
133                    .snapshot_config
134                    .incremental_snapshot_archive_interval,
135            })
136    }
137}