1use {
4 super::{snapshot_version_from_file, SnapshotError, SnapshotFrom, SnapshotVersion},
5 crate::serde_snapshot::{
6 self, reconstruct_single_storage, remap_and_reconstruct_single_storage,
7 snapshot_storage_lengths_from_fields, SerializedAccountsFileId,
8 },
9 crossbeam_channel::{select, unbounded, Receiver, Sender},
10 dashmap::DashMap,
11 log::*,
12 rayon::{
13 iter::{IntoParallelIterator, ParallelIterator},
14 ThreadPool, ThreadPoolBuilder,
15 },
16 regex::Regex,
17 solana_accounts_db::{
18 account_storage::{AccountStorageMap, AccountStorageReference},
19 accounts_db::{AccountStorageEntry, AccountsFileId, AtomicAccountsFileId},
20 accounts_file::StorageAccess,
21 },
22 solana_sdk::clock::Slot,
23 std::{
24 collections::HashMap,
25 fs::File,
26 io::{BufReader, Error as IoError},
27 path::PathBuf,
28 str::FromStr as _,
29 sync::{
30 atomic::{AtomicUsize, Ordering},
31 Arc, Mutex,
32 },
33 time::Instant,
34 },
35};
36
37lazy_static! {
38 static ref VERSION_FILE_REGEX: Regex = Regex::new(r"^version$").unwrap();
39 static ref BANK_FIELDS_FILE_REGEX: Regex = Regex::new(r"^[0-9]+(\.pre)?$").unwrap();
40}
41
42pub(crate) struct RebuiltSnapshotStorage {
44 pub snapshot_version: SnapshotVersion,
46 pub storage: AccountStorageMap,
48}
49
50#[derive(Debug)]
52pub(crate) struct SnapshotStorageRebuilder {
53 file_receiver: Receiver<PathBuf>,
55 num_threads: usize,
57 snapshot_storage_lengths: HashMap<Slot, HashMap<SerializedAccountsFileId, usize>>,
59 storage_paths: DashMap<Slot, Mutex<Vec<PathBuf>>>,
61 storage: AccountStorageMap,
63 next_append_vec_id: Arc<AtomicAccountsFileId>,
65 processed_slot_count: AtomicUsize,
67 num_collisions: AtomicUsize,
69 snapshot_from: SnapshotFrom,
71 storage_access: StorageAccess,
73}
74
75impl SnapshotStorageRebuilder {
76 pub(crate) fn rebuild_storage(
78 file_receiver: Receiver<PathBuf>,
79 num_threads: usize,
80 next_append_vec_id: Arc<AtomicAccountsFileId>,
81 snapshot_from: SnapshotFrom,
82 storage_access: StorageAccess,
83 ) -> Result<RebuiltSnapshotStorage, SnapshotError> {
84 let (snapshot_version_path, snapshot_file_path, append_vec_files) =
85 Self::get_version_and_snapshot_files(&file_receiver);
86 let snapshot_version_str = snapshot_version_from_file(snapshot_version_path)?;
87 let snapshot_version = snapshot_version_str.parse().map_err(|err| {
88 IoError::other(format!(
89 "unsupported snapshot version '{snapshot_version_str}': {err}",
90 ))
91 })?;
92 let snapshot_storage_lengths =
93 Self::process_snapshot_file(snapshot_version, snapshot_file_path)?;
94
95 let account_storage_map = Self::spawn_rebuilder_threads(
96 file_receiver,
97 num_threads,
98 next_append_vec_id,
99 snapshot_storage_lengths,
100 append_vec_files,
101 snapshot_from,
102 storage_access,
103 )?;
104
105 Ok(RebuiltSnapshotStorage {
106 snapshot_version,
107 storage: account_storage_map,
108 })
109 }
110
111 fn new(
114 file_receiver: Receiver<PathBuf>,
115 num_threads: usize,
116 next_append_vec_id: Arc<AtomicAccountsFileId>,
117 snapshot_storage_lengths: HashMap<Slot, HashMap<usize, usize>>,
118 snapshot_from: SnapshotFrom,
119 storage_access: StorageAccess,
120 ) -> Self {
121 let storage = DashMap::with_capacity(snapshot_storage_lengths.len());
122 let storage_paths: DashMap<_, _> = snapshot_storage_lengths
123 .iter()
124 .map(|(slot, storage_lengths)| {
125 (*slot, Mutex::new(Vec::with_capacity(storage_lengths.len())))
126 })
127 .collect();
128 Self {
129 file_receiver,
130 num_threads,
131 snapshot_storage_lengths,
132 storage_paths,
133 storage,
134 next_append_vec_id,
135 processed_slot_count: AtomicUsize::new(0),
136 num_collisions: AtomicUsize::new(0),
137 snapshot_from,
138 storage_access,
139 }
140 }
141
142 fn get_version_and_snapshot_files(
146 file_receiver: &Receiver<PathBuf>,
147 ) -> (PathBuf, PathBuf, Vec<PathBuf>) {
148 let mut append_vec_files = Vec::with_capacity(1024);
149 let mut snapshot_version_path = None;
150 let mut snapshot_file_path = None;
151
152 loop {
153 if let Ok(path) = file_receiver.recv() {
154 let filename = path.file_name().unwrap().to_str().unwrap();
155 match get_snapshot_file_kind(filename) {
156 Some(SnapshotFileKind::Version) => {
157 snapshot_version_path = Some(path);
158
159 if snapshot_file_path.is_some() {
161 break;
162 }
163 }
164 Some(SnapshotFileKind::BankFields) => {
165 snapshot_file_path = Some(path);
166
167 if snapshot_version_path.is_some() {
169 break;
170 }
171 }
172 Some(SnapshotFileKind::Storage) => {
173 append_vec_files.push(path);
174 }
175 None => {} }
177 } else {
178 panic!("did not receive snapshot file from unpacking threads");
179 }
180 }
181 let snapshot_version_path = snapshot_version_path.unwrap();
182 let snapshot_file_path = snapshot_file_path.unwrap();
183
184 (snapshot_version_path, snapshot_file_path, append_vec_files)
185 }
186
187 fn process_snapshot_file(
189 snapshot_version: SnapshotVersion,
190 snapshot_file_path: PathBuf,
191 ) -> Result<HashMap<Slot, HashMap<usize, usize>>, bincode::Error> {
192 let snapshot_file = File::open(snapshot_file_path).unwrap();
193 let mut snapshot_stream = BufReader::new(snapshot_file);
194 match snapshot_version {
195 SnapshotVersion::V1_2_0 => {
196 let (_bank_fields, accounts_fields) =
197 serde_snapshot::fields_from_stream(&mut snapshot_stream)?;
198
199 Ok(snapshot_storage_lengths_from_fields(&accounts_fields))
200 }
201 }
202 }
203
204 fn spawn_rebuilder_threads(
206 file_receiver: Receiver<PathBuf>,
207 num_threads: usize,
208 next_append_vec_id: Arc<AtomicAccountsFileId>,
209 snapshot_storage_lengths: HashMap<Slot, HashMap<usize, usize>>,
210 append_vec_files: Vec<PathBuf>,
211 snapshot_from: SnapshotFrom,
212 storage_access: StorageAccess,
213 ) -> Result<AccountStorageMap, SnapshotError> {
214 let rebuilder = Arc::new(SnapshotStorageRebuilder::new(
215 file_receiver,
216 num_threads,
217 next_append_vec_id,
218 snapshot_storage_lengths,
219 snapshot_from,
220 storage_access,
221 ));
222
223 let thread_pool = rebuilder.build_thread_pool();
224
225 if snapshot_from == SnapshotFrom::Archive {
226 thread_pool.install(|| rebuilder.process_buffered_files(append_vec_files))?;
228 }
229
230 let (exit_sender, exit_receiver) = unbounded();
232 for _ in 0..rebuilder.num_threads {
233 Self::spawn_receiver_thread(&thread_pool, exit_sender.clone(), rebuilder.clone());
234 }
235 drop(exit_sender); rebuilder.wait_for_completion(exit_receiver)?;
239 Ok(Arc::try_unwrap(rebuilder).unwrap().storage)
240 }
241
242 fn process_buffered_files(&self, append_vec_files: Vec<PathBuf>) -> Result<(), SnapshotError> {
244 append_vec_files
245 .into_par_iter()
246 .map(|path| self.process_append_vec_file(path))
247 .collect::<Result<(), SnapshotError>>()
248 }
249
250 fn spawn_receiver_thread(
252 thread_pool: &ThreadPool,
253 exit_sender: Sender<Result<(), SnapshotError>>,
254 rebuilder: Arc<SnapshotStorageRebuilder>,
255 ) {
256 thread_pool.spawn(move || {
257 for path in rebuilder.file_receiver.iter() {
258 match rebuilder.process_append_vec_file(path) {
259 Ok(_) => {}
260 Err(err) => {
261 exit_sender
262 .send(Err(err))
263 .expect("sender should be connected");
264 return;
265 }
266 }
267 }
268
269 exit_sender
270 .send(Ok(()))
271 .expect("sender should be connected");
272 })
273 }
274
275 fn process_append_vec_file(&self, path: PathBuf) -> Result<(), SnapshotError> {
277 let filename = path.file_name().unwrap().to_str().unwrap().to_owned();
278 if let Ok((slot, append_vec_id)) = get_slot_and_append_vec_id(&filename) {
279 if self.snapshot_from == SnapshotFrom::Dir {
280 self.next_append_vec_id
285 .fetch_max((append_vec_id + 1) as AccountsFileId, Ordering::Relaxed);
286 }
287 let slot_storage_count = self.insert_storage_file(&slot, path);
288 if slot_storage_count == self.snapshot_storage_lengths.get(&slot).unwrap().len() {
289 self.process_complete_slot(slot)?;
291 self.processed_slot_count.fetch_add(1, Ordering::AcqRel);
292 }
293 }
294 Ok(())
295 }
296
297 fn insert_storage_file(&self, slot: &Slot, path: PathBuf) -> usize {
299 let slot_paths = self.storage_paths.get(slot).unwrap();
300 let mut lock = slot_paths.lock().unwrap();
301 lock.push(path);
302 lock.len()
303 }
304
305 fn process_complete_slot(&self, slot: Slot) -> Result<(), SnapshotError> {
307 let slot_storage_paths = self.storage_paths.get(&slot).unwrap();
308 let lock = slot_storage_paths.lock().unwrap();
309
310 let slot_stores = lock
311 .iter()
312 .map(|path| {
313 let filename = path.file_name().unwrap().to_str().unwrap();
314 let (_, old_append_vec_id) = get_slot_and_append_vec_id(filename)?;
315 let current_len = *self
316 .snapshot_storage_lengths
317 .get(&slot)
318 .unwrap()
319 .get(&old_append_vec_id)
320 .unwrap();
321
322 let storage_entry = match &self.snapshot_from {
323 SnapshotFrom::Archive => remap_and_reconstruct_single_storage(
324 slot,
325 old_append_vec_id,
326 current_len,
327 path.as_path(),
328 &self.next_append_vec_id,
329 &self.num_collisions,
330 self.storage_access,
331 )?,
332 SnapshotFrom::Dir => reconstruct_single_storage(
333 &slot,
334 path.as_path(),
335 current_len,
336 old_append_vec_id as AccountsFileId,
337 self.storage_access,
338 )?,
339 };
340
341 Ok((storage_entry.id(), storage_entry))
342 })
343 .collect::<Result<HashMap<AccountsFileId, Arc<AccountStorageEntry>>, SnapshotError>>(
344 )?;
345
346 if slot_stores.len() != 1 {
347 return Err(SnapshotError::RebuildStorages(format!(
348 "there must be exactly one storage per slot, but slot {slot} has {} storages",
349 slot_stores.len()
350 )));
351 }
352 let (id, storage) = slot_stores.into_iter().next().unwrap();
355
356 self.storage
357 .insert(slot, AccountStorageReference { id, storage });
358 Ok(())
359 }
360
361 fn wait_for_completion(
363 &self,
364 exit_receiver: Receiver<Result<(), SnapshotError>>,
365 ) -> Result<(), SnapshotError> {
366 let num_slots = self.snapshot_storage_lengths.len();
367 let mut last_log_time = Instant::now();
368 loop {
369 select! {
370 recv(exit_receiver) -> maybe_exit_signal => {
371 match maybe_exit_signal {
372 Ok(Ok(_)) => continue, Ok(Err(err)) => { return Err(err);
375 }
376 Err(_) => break, }
378 }
379 default(std::time::Duration::from_millis(100)) => {
380 let now = Instant::now();
381 if now.duration_since(last_log_time).as_millis() >= 2000 {
382 let num_processed_slots = self.processed_slot_count.load(Ordering::Relaxed);
383 let num_collisions = self.num_collisions.load(Ordering::Relaxed);
384 info!("rebuilt storages for {num_processed_slots}/{num_slots} slots with {num_collisions} collisions");
385 last_log_time = now;
386 }
387 }
388 }
389 }
390
391 Ok(())
392 }
393
394 fn build_thread_pool(&self) -> ThreadPool {
396 ThreadPoolBuilder::default()
397 .thread_name(|i| format!("solRbuildSnap{i:02}"))
398 .num_threads(self.num_threads)
399 .build()
400 .expect("new rayon threadpool")
401 }
402}
403
404#[derive(PartialEq, Debug)]
406enum SnapshotFileKind {
407 Version,
408 BankFields,
409 Storage,
410}
411
412fn get_snapshot_file_kind(filename: &str) -> Option<SnapshotFileKind> {
414 if VERSION_FILE_REGEX.is_match(filename) {
415 Some(SnapshotFileKind::Version)
416 } else if BANK_FIELDS_FILE_REGEX.is_match(filename) {
417 Some(SnapshotFileKind::BankFields)
418 } else if get_slot_and_append_vec_id(filename).is_ok() {
419 Some(SnapshotFileKind::Storage)
420 } else {
421 None
422 }
423}
424
425pub(crate) fn get_slot_and_append_vec_id(filename: &str) -> Result<(Slot, usize), SnapshotError> {
427 let mut parts = filename.splitn(2, '.');
428 let slot = parts.next().and_then(|s| Slot::from_str(s).ok());
429 let id = parts.next().and_then(|s| usize::from_str(s).ok());
430
431 slot.zip(id)
432 .ok_or_else(|| SnapshotError::InvalidAppendVecPath(PathBuf::from(filename)))
433}
434
435#[cfg(test)]
436mod tests {
437 use {
438 super::*, crate::snapshot_utils::SNAPSHOT_VERSION_FILENAME,
439 solana_accounts_db::accounts_file::AccountsFile,
440 };
441
442 #[test]
443 fn test_get_snapshot_file_kind() {
444 assert_eq!(None, get_snapshot_file_kind("file.txt"));
445 assert_eq!(
446 Some(SnapshotFileKind::Version),
447 get_snapshot_file_kind(SNAPSHOT_VERSION_FILENAME)
448 );
449 assert_eq!(
450 Some(SnapshotFileKind::BankFields),
451 get_snapshot_file_kind("1234")
452 );
453 assert_eq!(
454 Some(SnapshotFileKind::Storage),
455 get_snapshot_file_kind("1000.999")
456 );
457 }
458
459 #[test]
460 fn test_get_slot_and_append_vec_id() {
461 let expected_slot = 12345;
462 let expected_id = 9987;
463 let (slot, id) =
464 get_slot_and_append_vec_id(&AccountsFile::file_name(expected_slot, expected_id))
465 .unwrap();
466 assert_eq!(expected_slot, slot);
467 assert_eq!(expected_id as usize, id);
468 }
469}