solana_ledger/
bigtable_upload.rs1use {
2 crate::blockstore::Blockstore,
3 crossbeam_channel::{bounded, unbounded},
4 log::*,
5 solana_clock::Slot,
6 solana_measure::measure::Measure,
7 std::{
8 cmp::{max, min},
9 collections::HashSet,
10 result::Result,
11 sync::{
12 atomic::{AtomicBool, Ordering},
13 Arc,
14 },
15 time::{Duration, Instant},
16 },
17};
18
19#[derive(Clone)]
20pub struct ConfirmedBlockUploadConfig {
21 pub force_reupload: bool,
22 pub max_num_slots_to_check: usize,
23 pub num_blocks_to_upload_in_parallel: usize,
24 pub block_read_ahead_depth: usize, }
26
27impl Default for ConfirmedBlockUploadConfig {
28 fn default() -> Self {
29 let num_blocks_to_upload_in_parallel = num_cpus::get() / 2;
30 ConfirmedBlockUploadConfig {
31 force_reupload: false,
32 max_num_slots_to_check: num_blocks_to_upload_in_parallel * 4,
33 num_blocks_to_upload_in_parallel,
34 block_read_ahead_depth: num_blocks_to_upload_in_parallel * 2,
35 }
36 }
37}
38
39struct BlockstoreLoadStats {
40 pub num_blocks_read: usize,
41 pub elapsed: Duration,
42}
43
44pub async fn upload_confirmed_blocks(
48 blockstore: Arc<Blockstore>,
49 bigtable: solana_storage_bigtable::LedgerStorage,
50 starting_slot: Slot,
51 ending_slot: Slot,
52 config: ConfirmedBlockUploadConfig,
53 exit: Arc<AtomicBool>,
54) -> Result<Slot, Box<dyn std::error::Error>> {
55 let mut measure = Measure::start("entire upload");
56
57 info!("Loading ledger slots from {starting_slot} to {ending_slot}");
58 let blockstore_slots: Vec<_> = blockstore
59 .rooted_slot_iterator(starting_slot)
60 .map_err(|err| {
61 format!("Failed to load entries starting from slot {starting_slot}: {err:?}")
62 })?
63 .take_while(|slot| *slot <= ending_slot)
64 .collect();
65
66 if blockstore_slots.is_empty() {
67 warn!("Ledger has no slots from {starting_slot} to {ending_slot:?}");
68 return Ok(ending_slot);
69 }
70
71 let first_blockstore_slot = *blockstore_slots.first().unwrap();
72 let last_blockstore_slot = *blockstore_slots.last().unwrap();
73 info!(
74 "Found {} slots in the range ({}, {})",
75 blockstore_slots.len(),
76 first_blockstore_slot,
77 last_blockstore_slot,
78 );
79
80 let bigtable_slots = if !config.force_reupload {
82 let mut bigtable_slots = vec![];
83 info!(
84 "Loading list of bigtable blocks between slots {first_blockstore_slot} and \
85 {last_blockstore_slot}..."
86 );
87
88 let mut start_slot = first_blockstore_slot;
89 while start_slot <= last_blockstore_slot {
90 let mut next_bigtable_slots = loop {
91 let num_bigtable_blocks = min(1000, config.max_num_slots_to_check * 2);
92 match bigtable
93 .get_confirmed_blocks(start_slot, num_bigtable_blocks)
94 .await
95 {
96 Ok(slots) => break slots,
97 Err(err) => {
98 error!("get_confirmed_blocks for {start_slot} failed: {err:?}");
99 tokio::time::sleep(Duration::from_secs(2)).await;
101 }
102 }
103 };
104 if next_bigtable_slots.is_empty() {
105 break;
106 }
107 bigtable_slots.append(&mut next_bigtable_slots);
108 start_slot = bigtable_slots.last().unwrap() + 1;
109 }
110 bigtable_slots
111 .into_iter()
112 .filter(|slot| *slot <= last_blockstore_slot)
113 .collect::<Vec<_>>()
114 } else {
115 Vec::new()
116 };
117
118 let blocks_to_upload = {
121 let blockstore_slots = blockstore_slots.into_iter().collect::<HashSet<_>>();
122 let bigtable_slots = bigtable_slots.into_iter().collect::<HashSet<_>>();
123
124 let mut blocks_to_upload = blockstore_slots
125 .difference(&bigtable_slots)
126 .cloned()
127 .collect::<Vec<_>>();
128 blocks_to_upload.sort_unstable();
129 blocks_to_upload.truncate(config.max_num_slots_to_check);
130 blocks_to_upload
131 };
132
133 if blocks_to_upload.is_empty() {
134 info!(
135 "No blocks between {starting_slot} and {ending_slot} need to be uploaded to bigtable"
136 );
137 return Ok(ending_slot);
138 }
139 let last_slot = *blocks_to_upload.last().unwrap();
140 info!(
141 "{} blocks to be uploaded to the bucket in the range ({}, {})",
142 blocks_to_upload.len(),
143 blocks_to_upload.first().unwrap(),
144 last_slot
145 );
146
147 let (loader_threads, receiver): (Vec<_>, _) = {
149 let exit = exit.clone();
150
151 let (sender, receiver) = bounded(config.block_read_ahead_depth);
152
153 let (slot_sender, slot_receiver) = unbounded();
154 blocks_to_upload
155 .into_iter()
156 .for_each(|b| slot_sender.send(b).unwrap());
157 drop(slot_sender);
158
159 (
160 (0..config.num_blocks_to_upload_in_parallel)
161 .map(|i| {
162 let blockstore = blockstore.clone();
163 let sender = sender.clone();
164 let slot_receiver = slot_receiver.clone();
165 let exit = exit.clone();
166 std::thread::Builder::new()
167 .name(format!("solBigTGetBlk{i:02}"))
168 .spawn(move || {
169 let start = Instant::now();
170 let mut num_blocks_read = 0;
171
172 while let Ok(slot) = slot_receiver.recv() {
173 if exit.load(Ordering::Relaxed) {
174 break;
175 }
176
177 let _ = match blockstore.get_rooted_block_with_entries(slot, true) {
178 Ok(confirmed_block_with_entries) => {
179 num_blocks_read += 1;
180 sender.send((slot, Some(confirmed_block_with_entries)))
181 }
182 Err(err) => {
183 warn!(
184 "Failed to get load confirmed block from slot {slot}: \
185 {err:?}"
186 );
187 sender.send((slot, None))
188 }
189 };
190 }
191 BlockstoreLoadStats {
192 num_blocks_read,
193 elapsed: start.elapsed(),
194 }
195 })
196 .unwrap()
197 })
198 .collect(),
199 receiver,
200 )
201 };
202
203 let mut failures = 0;
204 use futures::stream::StreamExt;
205
206 let mut stream =
207 tokio_stream::iter(receiver.into_iter()).chunks(config.num_blocks_to_upload_in_parallel);
208
209 while let Some(blocks) = stream.next().await {
210 if exit.load(Ordering::Relaxed) {
211 break;
212 }
213
214 let mut measure_upload = Measure::start("Upload");
215 let mut num_blocks = blocks.len();
216 info!("Preparing the next {num_blocks} blocks for upload");
217
218 let uploads = blocks.into_iter().filter_map(|(slot, block)| match block {
219 None => {
220 num_blocks -= 1;
221 None
222 }
223 Some(confirmed_block) => {
224 let bt = bigtable.clone();
225 Some(tokio::spawn(async move {
226 bt.upload_confirmed_block_with_entries(slot, confirmed_block)
227 .await
228 }))
229 }
230 });
231
232 for result in futures::future::join_all(uploads).await {
233 if let Err(err) = result {
234 error!("upload_confirmed_block() join failed: {err:?}");
235 failures += 1;
236 } else if let Err(err) = result.unwrap() {
237 error!("upload_confirmed_block() upload failed: {err:?}");
238 failures += 1;
239 }
240 }
241
242 measure_upload.stop();
243 info!("{measure_upload} for {num_blocks} blocks");
244 }
245
246 measure.stop();
247 info!("{measure}");
248
249 let blockstore_results = loader_threads.into_iter().map(|t| t.join());
250
251 let mut blockstore_num_blocks_read = 0;
252 let mut blockstore_load_wallclock = Duration::default();
253 let mut blockstore_errors = 0;
254
255 for r in blockstore_results {
256 match r {
257 Ok(stats) => {
258 blockstore_num_blocks_read += stats.num_blocks_read;
259 blockstore_load_wallclock = max(stats.elapsed, blockstore_load_wallclock);
260 }
261 Err(e) => {
262 error!("error joining blockstore thread: {e:?}");
263 blockstore_errors += 1;
264 }
265 }
266 }
267
268 info!(
269 "blockstore upload took {:?} for {} blocks ({:.2} blocks/s) errors: {}",
270 blockstore_load_wallclock,
271 blockstore_num_blocks_read,
272 blockstore_num_blocks_read as f64 / blockstore_load_wallclock.as_secs_f64(),
273 blockstore_errors
274 );
275
276 if failures > 0 {
277 Err(format!("Incomplete upload, {failures} operations failed").into())
278 } else {
279 Ok(last_slot)
280 }
281}