solana_runtime/snapshot_utils/
snapshot_storage_rebuilder.rs

1//! Provides interfaces for rebuilding snapshot storages
2
3use {
4    super::{snapshot_version_from_file, SnapshotError, SnapshotFrom, SnapshotVersion},
5    crate::serde_snapshot::{
6        self, reconstruct_single_storage, remap_and_reconstruct_single_storage,
7        snapshot_storage_lengths_from_fields, SerializedAccountsFileId,
8    },
9    crossbeam_channel::{select, unbounded, Receiver, Sender},
10    dashmap::DashMap,
11    log::*,
12    rayon::{
13        iter::{IntoParallelIterator, ParallelIterator},
14        ThreadPool, ThreadPoolBuilder,
15    },
16    regex::Regex,
17    solana_accounts_db::{
18        account_storage::{AccountStorageMap, AccountStorageReference},
19        accounts_db::{AccountStorageEntry, AccountsFileId, AtomicAccountsFileId},
20        accounts_file::StorageAccess,
21    },
22    solana_sdk::clock::Slot,
23    std::{
24        collections::HashMap,
25        fs::File,
26        io::{BufReader, Error as IoError},
27        path::PathBuf,
28        str::FromStr as _,
29        sync::{
30            atomic::{AtomicUsize, Ordering},
31            Arc, Mutex,
32        },
33        time::Instant,
34    },
35};
36
37lazy_static! {
38    static ref VERSION_FILE_REGEX: Regex = Regex::new(r"^version$").unwrap();
39    static ref BANK_FIELDS_FILE_REGEX: Regex = Regex::new(r"^[0-9]+(\.pre)?$").unwrap();
40}
41
42/// Convenient wrapper for snapshot version and rebuilt storages
43pub(crate) struct RebuiltSnapshotStorage {
44    /// Snapshot version
45    pub snapshot_version: SnapshotVersion,
46    /// Rebuilt storages
47    pub storage: AccountStorageMap,
48}
49
50/// Stores state for rebuilding snapshot storages
51#[derive(Debug)]
52pub(crate) struct SnapshotStorageRebuilder {
53    /// Receiver for unpacked snapshot storage files
54    file_receiver: Receiver<PathBuf>,
55    /// Number of threads to rebuild with
56    num_threads: usize,
57    /// Snapshot storage lengths - from the snapshot file
58    snapshot_storage_lengths: HashMap<Slot, HashMap<SerializedAccountsFileId, usize>>,
59    /// Container for storing snapshot file paths
60    storage_paths: DashMap<Slot, Mutex<Vec<PathBuf>>>,
61    /// Container for storing rebuilt snapshot storages
62    storage: AccountStorageMap,
63    /// Tracks next append_vec_id
64    next_append_vec_id: Arc<AtomicAccountsFileId>,
65    /// Tracker for number of processed slots
66    processed_slot_count: AtomicUsize,
67    /// Tracks the number of collisions in AccountsFileId
68    num_collisions: AtomicUsize,
69    /// Rebuild from the snapshot files or archives
70    snapshot_from: SnapshotFrom,
71    /// specify how storages are accessed
72    storage_access: StorageAccess,
73}
74
75impl SnapshotStorageRebuilder {
76    /// Synchronously spawns threads to rebuild snapshot storages
77    pub(crate) fn rebuild_storage(
78        file_receiver: Receiver<PathBuf>,
79        num_threads: usize,
80        next_append_vec_id: Arc<AtomicAccountsFileId>,
81        snapshot_from: SnapshotFrom,
82        storage_access: StorageAccess,
83    ) -> Result<RebuiltSnapshotStorage, SnapshotError> {
84        let (snapshot_version_path, snapshot_file_path, append_vec_files) =
85            Self::get_version_and_snapshot_files(&file_receiver);
86        let snapshot_version_str = snapshot_version_from_file(snapshot_version_path)?;
87        let snapshot_version = snapshot_version_str.parse().map_err(|err| {
88            IoError::other(format!(
89                "unsupported snapshot version '{snapshot_version_str}': {err}",
90            ))
91        })?;
92        let snapshot_storage_lengths =
93            Self::process_snapshot_file(snapshot_version, snapshot_file_path)?;
94
95        let account_storage_map = Self::spawn_rebuilder_threads(
96            file_receiver,
97            num_threads,
98            next_append_vec_id,
99            snapshot_storage_lengths,
100            append_vec_files,
101            snapshot_from,
102            storage_access,
103        )?;
104
105        Ok(RebuiltSnapshotStorage {
106            snapshot_version,
107            storage: account_storage_map,
108        })
109    }
110
111    /// Create the SnapshotStorageRebuilder for storing state during rebuilding
112    ///     - pre-allocates data for storage paths
113    fn new(
114        file_receiver: Receiver<PathBuf>,
115        num_threads: usize,
116        next_append_vec_id: Arc<AtomicAccountsFileId>,
117        snapshot_storage_lengths: HashMap<Slot, HashMap<usize, usize>>,
118        snapshot_from: SnapshotFrom,
119        storage_access: StorageAccess,
120    ) -> Self {
121        let storage = DashMap::with_capacity(snapshot_storage_lengths.len());
122        let storage_paths: DashMap<_, _> = snapshot_storage_lengths
123            .iter()
124            .map(|(slot, storage_lengths)| {
125                (*slot, Mutex::new(Vec::with_capacity(storage_lengths.len())))
126            })
127            .collect();
128        Self {
129            file_receiver,
130            num_threads,
131            snapshot_storage_lengths,
132            storage_paths,
133            storage,
134            next_append_vec_id,
135            processed_slot_count: AtomicUsize::new(0),
136            num_collisions: AtomicUsize::new(0),
137            snapshot_from,
138            storage_access,
139        }
140    }
141
142    /// Waits for snapshot file
143    /// Due to parallel unpacking, we may receive some append_vec files before the snapshot file
144    /// This function will push append_vec files into a buffer until we receive the snapshot file
145    fn get_version_and_snapshot_files(
146        file_receiver: &Receiver<PathBuf>,
147    ) -> (PathBuf, PathBuf, Vec<PathBuf>) {
148        let mut append_vec_files = Vec::with_capacity(1024);
149        let mut snapshot_version_path = None;
150        let mut snapshot_file_path = None;
151
152        loop {
153            if let Ok(path) = file_receiver.recv() {
154                let filename = path.file_name().unwrap().to_str().unwrap();
155                match get_snapshot_file_kind(filename) {
156                    Some(SnapshotFileKind::Version) => {
157                        snapshot_version_path = Some(path);
158
159                        // break if we have both the snapshot file and the version file
160                        if snapshot_file_path.is_some() {
161                            break;
162                        }
163                    }
164                    Some(SnapshotFileKind::BankFields) => {
165                        snapshot_file_path = Some(path);
166
167                        // break if we have both the snapshot file and the version file
168                        if snapshot_version_path.is_some() {
169                            break;
170                        }
171                    }
172                    Some(SnapshotFileKind::Storage) => {
173                        append_vec_files.push(path);
174                    }
175                    None => {} // do nothing for other kinds of files
176                }
177            } else {
178                panic!("did not receive snapshot file from unpacking threads");
179            }
180        }
181        let snapshot_version_path = snapshot_version_path.unwrap();
182        let snapshot_file_path = snapshot_file_path.unwrap();
183
184        (snapshot_version_path, snapshot_file_path, append_vec_files)
185    }
186
187    /// Process the snapshot file to get the size of each snapshot storage file
188    fn process_snapshot_file(
189        snapshot_version: SnapshotVersion,
190        snapshot_file_path: PathBuf,
191    ) -> Result<HashMap<Slot, HashMap<usize, usize>>, bincode::Error> {
192        let snapshot_file = File::open(snapshot_file_path).unwrap();
193        let mut snapshot_stream = BufReader::new(snapshot_file);
194        match snapshot_version {
195            SnapshotVersion::V1_2_0 => {
196                let (_bank_fields, accounts_fields) =
197                    serde_snapshot::fields_from_stream(&mut snapshot_stream)?;
198
199                Ok(snapshot_storage_lengths_from_fields(&accounts_fields))
200            }
201        }
202    }
203
204    /// Spawn threads for processing buffered append_vec_files, and then received files
205    fn spawn_rebuilder_threads(
206        file_receiver: Receiver<PathBuf>,
207        num_threads: usize,
208        next_append_vec_id: Arc<AtomicAccountsFileId>,
209        snapshot_storage_lengths: HashMap<Slot, HashMap<usize, usize>>,
210        append_vec_files: Vec<PathBuf>,
211        snapshot_from: SnapshotFrom,
212        storage_access: StorageAccess,
213    ) -> Result<AccountStorageMap, SnapshotError> {
214        let rebuilder = Arc::new(SnapshotStorageRebuilder::new(
215            file_receiver,
216            num_threads,
217            next_append_vec_id,
218            snapshot_storage_lengths,
219            snapshot_from,
220            storage_access,
221        ));
222
223        let thread_pool = rebuilder.build_thread_pool();
224
225        if snapshot_from == SnapshotFrom::Archive {
226            // Synchronously process buffered append_vec_files
227            thread_pool.install(|| rebuilder.process_buffered_files(append_vec_files))?;
228        }
229
230        // Asynchronously spawn threads to process received append_vec_files
231        let (exit_sender, exit_receiver) = unbounded();
232        for _ in 0..rebuilder.num_threads {
233            Self::spawn_receiver_thread(&thread_pool, exit_sender.clone(), rebuilder.clone());
234        }
235        drop(exit_sender); // drop otherwise loop below will never end
236
237        // wait for asynchronous threads to complete
238        rebuilder.wait_for_completion(exit_receiver)?;
239        Ok(Arc::try_unwrap(rebuilder).unwrap().storage)
240    }
241
242    /// Processes buffered append_vec_files
243    fn process_buffered_files(&self, append_vec_files: Vec<PathBuf>) -> Result<(), SnapshotError> {
244        append_vec_files
245            .into_par_iter()
246            .map(|path| self.process_append_vec_file(path))
247            .collect::<Result<(), SnapshotError>>()
248    }
249
250    /// Spawn a single thread to process received append_vec_files
251    fn spawn_receiver_thread(
252        thread_pool: &ThreadPool,
253        exit_sender: Sender<Result<(), SnapshotError>>,
254        rebuilder: Arc<SnapshotStorageRebuilder>,
255    ) {
256        thread_pool.spawn(move || {
257            for path in rebuilder.file_receiver.iter() {
258                match rebuilder.process_append_vec_file(path) {
259                    Ok(_) => {}
260                    Err(err) => {
261                        exit_sender
262                            .send(Err(err))
263                            .expect("sender should be connected");
264                        return;
265                    }
266                }
267            }
268
269            exit_sender
270                .send(Ok(()))
271                .expect("sender should be connected");
272        })
273    }
274
275    /// Process an append_vec_file
276    fn process_append_vec_file(&self, path: PathBuf) -> Result<(), SnapshotError> {
277        let filename = path.file_name().unwrap().to_str().unwrap().to_owned();
278        if let Ok((slot, append_vec_id)) = get_slot_and_append_vec_id(&filename) {
279            if self.snapshot_from == SnapshotFrom::Dir {
280                // Keep track of the highest append_vec_id in the system, so the future append_vecs
281                // can be assigned to unique IDs.  This is only needed when loading from a snapshot
282                // dir.  When loading from a snapshot archive, the max of the appendvec IDs is
283                // updated in remap_append_vec_file(), which is not in the from_dir route.
284                self.next_append_vec_id
285                    .fetch_max((append_vec_id + 1) as AccountsFileId, Ordering::Relaxed);
286            }
287            let slot_storage_count = self.insert_storage_file(&slot, path);
288            if slot_storage_count == self.snapshot_storage_lengths.get(&slot).unwrap().len() {
289                // slot_complete
290                self.process_complete_slot(slot)?;
291                self.processed_slot_count.fetch_add(1, Ordering::AcqRel);
292            }
293        }
294        Ok(())
295    }
296
297    /// Insert storage path into slot and return the number of storage files for the slot
298    fn insert_storage_file(&self, slot: &Slot, path: PathBuf) -> usize {
299        let slot_paths = self.storage_paths.get(slot).unwrap();
300        let mut lock = slot_paths.lock().unwrap();
301        lock.push(path);
302        lock.len()
303    }
304
305    /// Process a slot that has received all storage entries
306    fn process_complete_slot(&self, slot: Slot) -> Result<(), SnapshotError> {
307        let slot_storage_paths = self.storage_paths.get(&slot).unwrap();
308        let lock = slot_storage_paths.lock().unwrap();
309
310        let slot_stores = lock
311            .iter()
312            .map(|path| {
313                let filename = path.file_name().unwrap().to_str().unwrap();
314                let (_, old_append_vec_id) = get_slot_and_append_vec_id(filename)?;
315                let current_len = *self
316                    .snapshot_storage_lengths
317                    .get(&slot)
318                    .unwrap()
319                    .get(&old_append_vec_id)
320                    .unwrap();
321
322                let storage_entry = match &self.snapshot_from {
323                    SnapshotFrom::Archive => remap_and_reconstruct_single_storage(
324                        slot,
325                        old_append_vec_id,
326                        current_len,
327                        path.as_path(),
328                        &self.next_append_vec_id,
329                        &self.num_collisions,
330                        self.storage_access,
331                    )?,
332                    SnapshotFrom::Dir => reconstruct_single_storage(
333                        &slot,
334                        path.as_path(),
335                        current_len,
336                        old_append_vec_id as AccountsFileId,
337                        self.storage_access,
338                    )?,
339                };
340
341                Ok((storage_entry.id(), storage_entry))
342            })
343            .collect::<Result<HashMap<AccountsFileId, Arc<AccountStorageEntry>>, SnapshotError>>(
344            )?;
345
346        if slot_stores.len() != 1 {
347            return Err(SnapshotError::RebuildStorages(format!(
348                "there must be exactly one storage per slot, but slot {slot} has {} storages",
349                slot_stores.len()
350            )));
351        }
352        // SAFETY: The check above guarantees there is one item in slot_stores,
353        // so `.next()` will always return `Some`
354        let (id, storage) = slot_stores.into_iter().next().unwrap();
355
356        self.storage
357            .insert(slot, AccountStorageReference { id, storage });
358        Ok(())
359    }
360
361    /// Wait for the completion of the rebuilding threads
362    fn wait_for_completion(
363        &self,
364        exit_receiver: Receiver<Result<(), SnapshotError>>,
365    ) -> Result<(), SnapshotError> {
366        let num_slots = self.snapshot_storage_lengths.len();
367        let mut last_log_time = Instant::now();
368        loop {
369            select! {
370                recv(exit_receiver) -> maybe_exit_signal => {
371                    match maybe_exit_signal {
372                        Ok(Ok(_)) => continue, // thread exited successfully
373                        Ok(Err(err)) => { // thread exited with error
374                            return Err(err);
375                        }
376                        Err(_) => break, // all threads have exited - channel disconnected
377                    }
378                }
379                default(std::time::Duration::from_millis(100)) => {
380                    let now = Instant::now();
381                    if now.duration_since(last_log_time).as_millis() >= 2000 {
382                        let num_processed_slots = self.processed_slot_count.load(Ordering::Relaxed);
383                        let num_collisions = self.num_collisions.load(Ordering::Relaxed);
384                        info!("rebuilt storages for {num_processed_slots}/{num_slots} slots with {num_collisions} collisions");
385                        last_log_time = now;
386                    }
387                }
388            }
389        }
390
391        Ok(())
392    }
393
394    /// Builds thread pool to rebuild with
395    fn build_thread_pool(&self) -> ThreadPool {
396        ThreadPoolBuilder::default()
397            .thread_name(|i| format!("solRbuildSnap{i:02}"))
398            .num_threads(self.num_threads)
399            .build()
400            .expect("new rayon threadpool")
401    }
402}
403
404/// Used to determine if a filename is structured like a version file, bank file, or storage file
405#[derive(PartialEq, Debug)]
406enum SnapshotFileKind {
407    Version,
408    BankFields,
409    Storage,
410}
411
412/// Determines `SnapshotFileKind` for `filename` if any
413fn get_snapshot_file_kind(filename: &str) -> Option<SnapshotFileKind> {
414    if VERSION_FILE_REGEX.is_match(filename) {
415        Some(SnapshotFileKind::Version)
416    } else if BANK_FIELDS_FILE_REGEX.is_match(filename) {
417        Some(SnapshotFileKind::BankFields)
418    } else if get_slot_and_append_vec_id(filename).is_ok() {
419        Some(SnapshotFileKind::Storage)
420    } else {
421        None
422    }
423}
424
425/// Get the slot and append vec id from the filename
426pub(crate) fn get_slot_and_append_vec_id(filename: &str) -> Result<(Slot, usize), SnapshotError> {
427    let mut parts = filename.splitn(2, '.');
428    let slot = parts.next().and_then(|s| Slot::from_str(s).ok());
429    let id = parts.next().and_then(|s| usize::from_str(s).ok());
430
431    slot.zip(id)
432        .ok_or_else(|| SnapshotError::InvalidAppendVecPath(PathBuf::from(filename)))
433}
434
435#[cfg(test)]
436mod tests {
437    use {
438        super::*, crate::snapshot_utils::SNAPSHOT_VERSION_FILENAME,
439        solana_accounts_db::accounts_file::AccountsFile,
440    };
441
442    #[test]
443    fn test_get_snapshot_file_kind() {
444        assert_eq!(None, get_snapshot_file_kind("file.txt"));
445        assert_eq!(
446            Some(SnapshotFileKind::Version),
447            get_snapshot_file_kind(SNAPSHOT_VERSION_FILENAME)
448        );
449        assert_eq!(
450            Some(SnapshotFileKind::BankFields),
451            get_snapshot_file_kind("1234")
452        );
453        assert_eq!(
454            Some(SnapshotFileKind::Storage),
455            get_snapshot_file_kind("1000.999")
456        );
457    }
458
459    #[test]
460    fn test_get_slot_and_append_vec_id() {
461        let expected_slot = 12345;
462        let expected_id = 9987;
463        let (slot, id) =
464            get_slot_and_append_vec_id(&AccountsFile::file_name(expected_slot, expected_id))
465                .unwrap();
466        assert_eq!(expected_slot, slot);
467        assert_eq!(expected_id as usize, id);
468    }
469}