Skip to main content

solana_ledger/
bigtable_upload.rs

1use {
2    crate::blockstore::Blockstore,
3    crossbeam_channel::{bounded, unbounded},
4    log::*,
5    solana_clock::Slot,
6    solana_measure::measure::Measure,
7    std::{
8        cmp::{max, min},
9        collections::HashSet,
10        result::Result,
11        sync::{
12            atomic::{AtomicBool, Ordering},
13            Arc,
14        },
15        time::{Duration, Instant},
16    },
17};
18
19#[derive(Clone)]
20pub struct ConfirmedBlockUploadConfig {
21    pub force_reupload: bool,
22    pub max_num_slots_to_check: usize,
23    pub num_blocks_to_upload_in_parallel: usize,
24    pub block_read_ahead_depth: usize, // should always be >= `num_blocks_to_upload_in_parallel`
25}
26
27impl Default for ConfirmedBlockUploadConfig {
28    fn default() -> Self {
29        let num_blocks_to_upload_in_parallel = num_cpus::get() / 2;
30        ConfirmedBlockUploadConfig {
31            force_reupload: false,
32            max_num_slots_to_check: num_blocks_to_upload_in_parallel * 4,
33            num_blocks_to_upload_in_parallel,
34            block_read_ahead_depth: num_blocks_to_upload_in_parallel * 2,
35        }
36    }
37}
38
39struct BlockstoreLoadStats {
40    pub num_blocks_read: usize,
41    pub elapsed: Duration,
42}
43
44/// Uploads a range of blocks from a Blockstore to bigtable LedgerStorage
45/// Returns the Slot of the last block checked. If no blocks in the range `[staring_slot,
46/// ending_slot]` are found in Blockstore, this value is equal to `ending_slot`.
47pub async fn upload_confirmed_blocks(
48    blockstore: Arc<Blockstore>,
49    bigtable: solana_storage_bigtable::LedgerStorage,
50    starting_slot: Slot,
51    ending_slot: Slot,
52    config: ConfirmedBlockUploadConfig,
53    exit: Arc<AtomicBool>,
54) -> Result<Slot, Box<dyn std::error::Error>> {
55    let mut measure = Measure::start("entire upload");
56
57    info!("Loading ledger slots from {starting_slot} to {ending_slot}");
58    let blockstore_slots: Vec<_> = blockstore
59        .rooted_slot_iterator(starting_slot)
60        .map_err(|err| {
61            format!("Failed to load entries starting from slot {starting_slot}: {err:?}")
62        })?
63        .take_while(|slot| *slot <= ending_slot)
64        .collect();
65
66    if blockstore_slots.is_empty() {
67        warn!("Ledger has no slots from {starting_slot} to {ending_slot:?}");
68        return Ok(ending_slot);
69    }
70
71    let first_blockstore_slot = *blockstore_slots.first().unwrap();
72    let last_blockstore_slot = *blockstore_slots.last().unwrap();
73    info!(
74        "Found {} slots in the range ({}, {})",
75        blockstore_slots.len(),
76        first_blockstore_slot,
77        last_blockstore_slot,
78    );
79
80    // Gather the blocks that are already present in bigtable, by slot
81    let bigtable_slots = if !config.force_reupload {
82        let mut bigtable_slots = vec![];
83        info!(
84            "Loading list of bigtable blocks between slots {first_blockstore_slot} and \
85             {last_blockstore_slot}..."
86        );
87
88        let mut start_slot = first_blockstore_slot;
89        while start_slot <= last_blockstore_slot {
90            let mut next_bigtable_slots = loop {
91                let num_bigtable_blocks = min(1000, config.max_num_slots_to_check * 2);
92                match bigtable
93                    .get_confirmed_blocks(start_slot, num_bigtable_blocks)
94                    .await
95                {
96                    Ok(slots) => break slots,
97                    Err(err) => {
98                        error!("get_confirmed_blocks for {start_slot} failed: {err:?}");
99                        // Consider exponential backoff...
100                        tokio::time::sleep(Duration::from_secs(2)).await;
101                    }
102                }
103            };
104            if next_bigtable_slots.is_empty() {
105                break;
106            }
107            bigtable_slots.append(&mut next_bigtable_slots);
108            start_slot = bigtable_slots.last().unwrap() + 1;
109        }
110        bigtable_slots
111            .into_iter()
112            .filter(|slot| *slot <= last_blockstore_slot)
113            .collect::<Vec<_>>()
114    } else {
115        Vec::new()
116    };
117
118    // The blocks that still need to be uploaded is the difference between what's already in the
119    // bigtable and what's in blockstore...
120    let blocks_to_upload = {
121        let blockstore_slots = blockstore_slots.into_iter().collect::<HashSet<_>>();
122        let bigtable_slots = bigtable_slots.into_iter().collect::<HashSet<_>>();
123
124        let mut blocks_to_upload = blockstore_slots
125            .difference(&bigtable_slots)
126            .cloned()
127            .collect::<Vec<_>>();
128        blocks_to_upload.sort_unstable();
129        blocks_to_upload.truncate(config.max_num_slots_to_check);
130        blocks_to_upload
131    };
132
133    if blocks_to_upload.is_empty() {
134        info!(
135            "No blocks between {starting_slot} and {ending_slot} need to be uploaded to bigtable"
136        );
137        return Ok(ending_slot);
138    }
139    let last_slot = *blocks_to_upload.last().unwrap();
140    info!(
141        "{} blocks to be uploaded to the bucket in the range ({}, {})",
142        blocks_to_upload.len(),
143        blocks_to_upload.first().unwrap(),
144        last_slot
145    );
146
147    // Distribute the blockstore reading across a few background threads to speed up the bigtable uploading
148    let (loader_threads, receiver): (Vec<_>, _) = {
149        let exit = exit.clone();
150
151        let (sender, receiver) = bounded(config.block_read_ahead_depth);
152
153        let (slot_sender, slot_receiver) = unbounded();
154        blocks_to_upload
155            .into_iter()
156            .for_each(|b| slot_sender.send(b).unwrap());
157        drop(slot_sender);
158
159        (
160            (0..config.num_blocks_to_upload_in_parallel)
161                .map(|i| {
162                    let blockstore = blockstore.clone();
163                    let sender = sender.clone();
164                    let slot_receiver = slot_receiver.clone();
165                    let exit = exit.clone();
166                    std::thread::Builder::new()
167                        .name(format!("solBigTGetBlk{i:02}"))
168                        .spawn(move || {
169                            let start = Instant::now();
170                            let mut num_blocks_read = 0;
171
172                            while let Ok(slot) = slot_receiver.recv() {
173                                if exit.load(Ordering::Relaxed) {
174                                    break;
175                                }
176
177                                let _ = match blockstore.get_rooted_block_with_entries(slot, true) {
178                                    Ok(confirmed_block_with_entries) => {
179                                        num_blocks_read += 1;
180                                        sender.send((slot, Some(confirmed_block_with_entries)))
181                                    }
182                                    Err(err) => {
183                                        warn!(
184                                            "Failed to get load confirmed block from slot {slot}: \
185                                             {err:?}"
186                                        );
187                                        sender.send((slot, None))
188                                    }
189                                };
190                            }
191                            BlockstoreLoadStats {
192                                num_blocks_read,
193                                elapsed: start.elapsed(),
194                            }
195                        })
196                        .unwrap()
197                })
198                .collect(),
199            receiver,
200        )
201    };
202
203    let mut failures = 0;
204    use futures::stream::StreamExt;
205
206    let mut stream =
207        tokio_stream::iter(receiver.into_iter()).chunks(config.num_blocks_to_upload_in_parallel);
208
209    while let Some(blocks) = stream.next().await {
210        if exit.load(Ordering::Relaxed) {
211            break;
212        }
213
214        let mut measure_upload = Measure::start("Upload");
215        let mut num_blocks = blocks.len();
216        info!("Preparing the next {num_blocks} blocks for upload");
217
218        let uploads = blocks.into_iter().filter_map(|(slot, block)| match block {
219            None => {
220                num_blocks -= 1;
221                None
222            }
223            Some(confirmed_block) => {
224                let bt = bigtable.clone();
225                Some(tokio::spawn(async move {
226                    bt.upload_confirmed_block_with_entries(slot, confirmed_block)
227                        .await
228                }))
229            }
230        });
231
232        for result in futures::future::join_all(uploads).await {
233            if let Err(err) = result {
234                error!("upload_confirmed_block() join failed: {err:?}");
235                failures += 1;
236            } else if let Err(err) = result.unwrap() {
237                error!("upload_confirmed_block() upload failed: {err:?}");
238                failures += 1;
239            }
240        }
241
242        measure_upload.stop();
243        info!("{measure_upload} for {num_blocks} blocks");
244    }
245
246    measure.stop();
247    info!("{measure}");
248
249    let blockstore_results = loader_threads.into_iter().map(|t| t.join());
250
251    let mut blockstore_num_blocks_read = 0;
252    let mut blockstore_load_wallclock = Duration::default();
253    let mut blockstore_errors = 0;
254
255    for r in blockstore_results {
256        match r {
257            Ok(stats) => {
258                blockstore_num_blocks_read += stats.num_blocks_read;
259                blockstore_load_wallclock = max(stats.elapsed, blockstore_load_wallclock);
260            }
261            Err(e) => {
262                error!("error joining blockstore thread: {e:?}");
263                blockstore_errors += 1;
264            }
265        }
266    }
267
268    info!(
269        "blockstore upload took {:?} for {} blocks ({:.2} blocks/s) errors: {}",
270        blockstore_load_wallclock,
271        blockstore_num_blocks_read,
272        blockstore_num_blocks_read as f64 / blockstore_load_wallclock.as_secs_f64(),
273        blockstore_errors
274    );
275
276    if failures > 0 {
277        Err(format!("Incomplete upload, {failures} operations failed").into())
278    } else {
279        Ok(last_slot)
280    }
281}