solana_runtime/
snapshot_controller.rs1use {
2 crate::{
3 accounts_background_service::{
4 SnapshotRequest, SnapshotRequestKind, SnapshotRequestSender,
5 },
6 bank::{Bank, SquashTiming},
7 },
8 agave_snapshots::{snapshot_config::SnapshotConfig, SnapshotInterval},
9 log::*,
10 solana_clock::Slot,
11 solana_measure::measure::Measure,
12 std::{
13 sync::{
14 atomic::{AtomicU64, Ordering},
15 Arc,
16 },
17 time::Instant,
18 },
19};
20
21struct SnapshotGenerationIntervals {
22 full_snapshot_interval: SnapshotInterval,
23 incremental_snapshot_interval: SnapshotInterval,
24}
25
26pub struct SnapshotController {
27 abs_request_sender: SnapshotRequestSender,
28 snapshot_config: SnapshotConfig,
29 latest_abs_request_slot: AtomicU64,
30}
31
32impl SnapshotController {
33 pub fn new(
34 abs_request_sender: SnapshotRequestSender,
35 snapshot_config: SnapshotConfig,
36 root_slot: Slot,
37 ) -> Self {
38 Self {
39 abs_request_sender,
40 snapshot_config,
41 latest_abs_request_slot: AtomicU64::new(root_slot),
42 }
43 }
44
45 pub fn snapshot_config(&self) -> &SnapshotConfig {
46 &self.snapshot_config
47 }
48
49 pub fn request_sender(&self) -> &SnapshotRequestSender {
50 &self.abs_request_sender
51 }
52
53 fn latest_abs_request_slot(&self) -> Slot {
54 self.latest_abs_request_slot.load(Ordering::Relaxed)
55 }
56
57 fn set_latest_abs_request_slot(&self, slot: Slot) {
58 self.latest_abs_request_slot.store(slot, Ordering::Relaxed);
59 }
60
61 pub fn handle_new_roots(&self, root: Slot, banks: &[&Arc<Bank>]) -> (bool, SquashTiming, u64) {
62 let mut is_root_bank_squashed = false;
63 let mut squash_timing = SquashTiming::default();
64 let mut total_snapshot_ms = 0;
65
66 if let Some(SnapshotGenerationIntervals {
67 full_snapshot_interval,
68 incremental_snapshot_interval,
69 }) = self.snapshot_generation_intervals()
70 {
71 if let Some((bank, request_kind)) = banks.iter().find_map(|bank| {
72 let should_request_full_snapshot =
73 if let SnapshotInterval::Slots(snapshot_interval) = full_snapshot_interval {
74 bank.block_height() % snapshot_interval == 0
75 } else {
76 false
77 };
78 let should_request_incremental_snapshot =
79 if let SnapshotInterval::Slots(snapshot_interval) =
80 incremental_snapshot_interval
81 {
82 bank.block_height() % snapshot_interval == 0
83 } else {
84 false
85 };
86
87 if bank.slot() <= self.latest_abs_request_slot() {
88 None
89 } else if should_request_full_snapshot {
90 Some((bank, SnapshotRequestKind::FullSnapshot))
91 } else if should_request_incremental_snapshot {
92 Some((bank, SnapshotRequestKind::IncrementalSnapshot))
93 } else {
94 None
95 }
96 }) {
97 let bank_slot = bank.slot();
98 self.set_latest_abs_request_slot(bank_slot);
99 squash_timing += bank.squash();
100
101 is_root_bank_squashed = bank_slot == root;
102
103 let mut snapshot_time = Measure::start("squash::snapshot_time");
104 let status_cache_slot_deltas = bank.status_cache.read().unwrap().root_slot_deltas();
107 if let Err(e) = self.abs_request_sender.send(SnapshotRequest {
108 snapshot_root_bank: Arc::clone(bank),
109 status_cache_slot_deltas,
110 request_kind,
111 enqueued: Instant::now(),
112 }) {
113 warn!("Error sending snapshot request for bank: {bank_slot}, err: {e:?}");
114 }
115 snapshot_time.stop();
116 total_snapshot_ms += snapshot_time.as_ms();
117 }
118 }
119
120 (is_root_bank_squashed, squash_timing, total_snapshot_ms)
121 }
122
123 fn snapshot_generation_intervals(&self) -> Option<SnapshotGenerationIntervals> {
128 self.snapshot_config
129 .should_generate_snapshots()
130 .then_some(SnapshotGenerationIntervals {
131 full_snapshot_interval: self.snapshot_config.full_snapshot_archive_interval,
132 incremental_snapshot_interval: self
133 .snapshot_config
134 .incremental_snapshot_archive_interval,
135 })
136 }
137}