1mod stats;
6use {
7    crate::{
8        accounts_hash::CalcAccountsHashConfig,
9        bank::{Bank, BankSlotDelta, DropCallback},
10        bank_forks::BankForks,
11        snapshot_config::SnapshotConfig,
12        snapshot_package::{PendingAccountsPackage, SnapshotType},
13        snapshot_utils::{self, SnapshotError},
14    },
15    crossbeam_channel::{Receiver, SendError, Sender, TrySendError},
16    log::*,
17    rand::{thread_rng, Rng},
18    safecoin_measure::measure::Measure,
19    solana_sdk::{
20        clock::{BankId, Slot},
21        hash::Hash,
22    },
23    stats::StatsManager,
24    std::{
25        boxed::Box,
26        fmt::{Debug, Formatter},
27        sync::{
28            atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
29            Arc, RwLock,
30        },
31        thread::{self, sleep, Builder, JoinHandle},
32        time::{Duration, Instant},
33    },
34};
35
36const INTERVAL_MS: u64 = 100;
37const SHRUNKEN_ACCOUNT_PER_SEC: usize = 250;
38const SHRUNKEN_ACCOUNT_PER_INTERVAL: usize =
39    SHRUNKEN_ACCOUNT_PER_SEC / (1000 / INTERVAL_MS as usize);
40const CLEAN_INTERVAL_BLOCKS: u64 = 100;
41
42const RECYCLE_STORE_EXPIRATION_INTERVAL_SECS: u64 = crate::accounts_db::EXPIRATION_TTL_SECONDS / 3;
48
49pub type SnapshotRequestSender = Sender<SnapshotRequest>;
50pub type SnapshotRequestReceiver = Receiver<SnapshotRequest>;
51pub type DroppedSlotsSender = Sender<(Slot, BankId)>;
52pub type DroppedSlotsReceiver = Receiver<(Slot, BankId)>;
53
54const BANK_DROP_SIGNAL_CHANNEL_REPORT_INTERVAL: u64 = 60_000;
56const MAX_DROP_BANK_SIGNAL_QUEUE_SIZE: usize = 10_000;
58
59#[allow(dead_code)]
61enum BankDropQueueEvent {
62    Full,
63    Disconnected,
64}
65
66#[derive(Debug, Default)]
68struct BankDropQueueStats {
69    report_time: AtomicU64,
70    queue_full: AtomicUsize,
71    queue_disconnected: AtomicUsize,
72}
73
74impl BankDropQueueStats {
75    fn increase(&self, event: BankDropQueueEvent) {
77        let counter = match event {
78            BankDropQueueEvent::Full => &self.queue_full,
79            BankDropQueueEvent::Disconnected => &self.queue_disconnected,
80        };
81
82        counter.fetch_add(1, Ordering::Relaxed);
83    }
84
85    fn report(&self, event: BankDropQueueEvent) {
87        let counter = match event {
88            BankDropQueueEvent::Full => &self.queue_full,
89            BankDropQueueEvent::Disconnected => &self.queue_disconnected,
90        };
91
92        let name = match event {
93            BankDropQueueEvent::Full => "full",
94            BankDropQueueEvent::Disconnected => "disconnected",
95        };
96
97        let ts = solana_sdk::timing::timestamp();
98        let last_report_time = self.report_time.load(Ordering::Acquire);
99        if ts.saturating_sub(last_report_time) > BANK_DROP_SIGNAL_CHANNEL_REPORT_INTERVAL {
100            let val = counter.load(Ordering::Relaxed);
101
102            if counter
103                .compare_exchange_weak(val, 0, Ordering::AcqRel, Ordering::Acquire)
104                .is_ok()
105            {
106                if val > 0 {
107                    datapoint_info!("bank_drop_queue_event", (name, val, i64));
108                }
109                self.report_time.store(ts, Ordering::Release);
110            }
111        }
112    }
113}
114
115lazy_static! {
116    static ref BANK_DROP_QUEUE_STATS: BankDropQueueStats = BankDropQueueStats::default();
117}
118
119#[derive(Clone)]
120pub struct SendDroppedBankCallback {
121    sender: DroppedSlotsSender,
122}
123
124impl DropCallback for SendDroppedBankCallback {
125    fn callback(&self, bank: &Bank) {
126        BANK_DROP_QUEUE_STATS.report(BankDropQueueEvent::Full);
127        match self.sender.try_send((bank.slot(), bank.bank_id())) {
128            Err(TrySendError::Full(_)) => {
129                BANK_DROP_QUEUE_STATS.increase(BankDropQueueEvent::Full);
130                BANK_DROP_QUEUE_STATS.report(BankDropQueueEvent::Full);
131
132                let _ = self.sender.send((bank.slot(), bank.bank_id()));
134            }
135
136            Err(TrySendError::Disconnected(_)) => {
137                info!("bank DropCallback signal queue disconnected.");
138            }
139            Ok(_) => {}
141        }
142    }
143
144    fn clone_box(&self) -> Box<dyn DropCallback + Send + Sync> {
145        Box::new(self.clone())
146    }
147}
148
149impl Debug for SendDroppedBankCallback {
150    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
151        write!(f, "SendDroppedBankCallback({:p})", self)
152    }
153}
154
155impl SendDroppedBankCallback {
156    pub fn new(sender: DroppedSlotsSender) -> Self {
157        Self { sender }
158    }
159}
160
161pub struct SnapshotRequest {
162    pub snapshot_root_bank: Arc<Bank>,
163    pub status_cache_slot_deltas: Vec<BankSlotDelta>,
164}
165
166pub struct SnapshotRequestHandler {
167    pub snapshot_config: SnapshotConfig,
168    pub snapshot_request_receiver: SnapshotRequestReceiver,
169    pub pending_accounts_package: PendingAccountsPackage,
170}
171
172impl SnapshotRequestHandler {
173    pub fn handle_snapshot_requests(
175        &self,
176        accounts_db_caching_enabled: bool,
177        test_hash_calculation: bool,
178        non_snapshot_time_us: u128,
179        last_full_snapshot_slot: &mut Option<Slot>,
180    ) -> Option<Result<u64, SnapshotError>> {
181        self.snapshot_request_receiver
182            .try_iter()
183            .last()
184            .map(|snapshot_request| {
185                let mut total_time = Measure::start("snapshot_request_receiver_total_time");
186                let SnapshotRequest {
187                    snapshot_root_bank,
188                    status_cache_slot_deltas,
189                } = snapshot_request;
190
191                assert!(snapshot_root_bank.is_startup_verification_complete());
193
194                let previous_hash = if test_hash_calculation {
195                    snapshot_root_bank.update_accounts_hash_with_index_option(true, false, false)
198                } else {
199                    Hash::default()
200                };
201
202                let mut shrink_time = Measure::start("shrink_time");
203                if !accounts_db_caching_enabled {
204                    snapshot_root_bank
205                        .process_stale_slot_with_budget(0, SHRUNKEN_ACCOUNT_PER_INTERVAL);
206                }
207                shrink_time.stop();
208
209                let mut flush_accounts_cache_time = Measure::start("flush_accounts_cache_time");
210                if accounts_db_caching_enabled {
211                    snapshot_root_bank.force_flush_accounts_cache();
216                    assert!(
220                        snapshot_root_bank.slot()
221                            <= snapshot_root_bank
222                                .rc
223                                .accounts
224                                .accounts_db
225                                .accounts_cache
226                                .fetch_max_flush_root()
227                    );
228                }
229                flush_accounts_cache_time.stop();
230
231                let hash_for_testing = if test_hash_calculation {
232                    let use_index_hash_calculation = false;
233                    let check_hash = false;
234
235                    let (this_hash, capitalization) = snapshot_root_bank.accounts().accounts_db.calculate_accounts_hash_helper(
236                        use_index_hash_calculation,
237                        snapshot_root_bank.slot(),
238                        &CalcAccountsHashConfig {
239                            use_bg_thread_pool: true,
240                            check_hash,
241                            ancestors: None,
242                            use_write_cache: false,
243                            epoch_schedule: snapshot_root_bank.epoch_schedule(),
244                            rent_collector: snapshot_root_bank.rent_collector(),
245                            store_detailed_debug_info_on_failure: false,
246                            full_snapshot: None,
247                        },
248                    ).unwrap();
249                    assert_eq!(previous_hash, this_hash);
250                    assert_eq!(capitalization, snapshot_root_bank.capitalization());
251                    Some(this_hash)
252                } else {
253                    None
254                };
255
256                let mut clean_time = Measure::start("clean_time");
257                snapshot_root_bank.clean_accounts(true, false, *last_full_snapshot_slot);
262                clean_time.stop();
263
264                if accounts_db_caching_enabled {
265                    shrink_time = Measure::start("shrink_time");
266                    snapshot_root_bank.shrink_candidate_slots();
267                    shrink_time.stop();
268                }
269
270                let block_height = snapshot_root_bank.block_height();
271                let snapshot_type = if snapshot_utils::should_take_full_snapshot(
272                    block_height,
273                    self.snapshot_config.full_snapshot_archive_interval_slots,
274                ) {
275                    *last_full_snapshot_slot = Some(snapshot_root_bank.slot());
276                    Some(SnapshotType::FullSnapshot)
277                } else if snapshot_utils::should_take_incremental_snapshot(
278                    block_height,
279                    self.snapshot_config
280                        .incremental_snapshot_archive_interval_slots,
281                    *last_full_snapshot_slot,
282                ) {
283                    Some(SnapshotType::IncrementalSnapshot(
284                        last_full_snapshot_slot.unwrap(),
285                    ))
286                } else {
287                    None
288                };
289
290                let mut snapshot_time = Measure::start("snapshot_time");
292                let result = snapshot_utils::snapshot_bank(
293                    &snapshot_root_bank,
294                    status_cache_slot_deltas,
295                    &self.pending_accounts_package,
296                    &self.snapshot_config.bank_snapshots_dir,
297                    &self.snapshot_config.full_snapshot_archives_dir,
298                    &self.snapshot_config.incremental_snapshot_archives_dir,
299                    self.snapshot_config.snapshot_version,
300                    self.snapshot_config.archive_format,
301                    hash_for_testing,
302                    snapshot_type,
303                );
304                if let Err(e) = result {
305                    warn!(
306                        "Error taking bank snapshot. slot: {}, snapshot type: {:?}, err: {:?}",
307                        snapshot_root_bank.slot(),
308                        snapshot_type,
309                        e,
310                    );
311
312                    if Self::is_snapshot_error_fatal(&e) {
313                        return Err(e);
314                    }
315                }
316                snapshot_time.stop();
317                info!("Took bank snapshot. snapshot type: {:?}, slot: {}, accounts hash: {}, bank hash: {}",
318                      snapshot_type,
319                      snapshot_root_bank.slot(),
320                      snapshot_root_bank.get_accounts_hash(),
321                      snapshot_root_bank.hash(),
322                  );
323
324                let mut purge_old_snapshots_time = Measure::start("purge_old_snapshots_time");
326                snapshot_utils::purge_old_bank_snapshots(&self.snapshot_config.bank_snapshots_dir);
327                purge_old_snapshots_time.stop();
328                total_time.stop();
329
330                datapoint_info!(
331                    "handle_snapshot_requests-timing",
332                    (
333                        "flush_accounts_cache_time",
334                        flush_accounts_cache_time.as_us(),
335                        i64
336                    ),
337                    ("shrink_time", shrink_time.as_us(), i64),
338                    ("clean_time", clean_time.as_us(), i64),
339                    ("snapshot_time", snapshot_time.as_us(), i64),
340                    (
341                        "purge_old_snapshots_time",
342                        purge_old_snapshots_time.as_us(),
343                        i64
344                    ),
345                    ("total_us", total_time.as_us(), i64),
346                    ("non_snapshot_time_us", non_snapshot_time_us, i64),
347                );
348                Ok(snapshot_root_bank.block_height())
349            })
350    }
351
352    fn is_snapshot_error_fatal(err: &SnapshotError) -> bool {
360        match err {
361            SnapshotError::Io(..) => true,
362            SnapshotError::Serialize(..) => true,
363            SnapshotError::ArchiveGenerationFailure(..) => true,
364            SnapshotError::StoragePathSymlinkInvalid => true,
365            SnapshotError::UnpackError(..) => true,
366            SnapshotError::IoWithSource(..) => true,
367            SnapshotError::PathToFileNameError(..) => true,
368            SnapshotError::FileNameToStrError(..) => true,
369            SnapshotError::ParseSnapshotArchiveFileNameError(..) => true,
370            SnapshotError::MismatchedBaseSlot(..) => true,
371            SnapshotError::NoSnapshotArchives => true,
372            SnapshotError::MismatchedSlotHash(..) => true,
373            SnapshotError::VerifySlotDeltas(..) => true,
374        }
375    }
376}
377
378#[derive(Default, Clone)]
379pub struct AbsRequestSender {
380    snapshot_request_sender: Option<SnapshotRequestSender>,
381}
382
383impl AbsRequestSender {
384    pub fn new(snapshot_request_sender: SnapshotRequestSender) -> Self {
385        Self {
386            snapshot_request_sender: Some(snapshot_request_sender),
387        }
388    }
389
390    pub fn is_snapshot_creation_enabled(&self) -> bool {
391        self.snapshot_request_sender.is_some()
392    }
393
394    pub fn send_snapshot_request(
395        &self,
396        snapshot_request: SnapshotRequest,
397    ) -> Result<(), SendError<SnapshotRequest>> {
398        if let Some(ref snapshot_request_sender) = self.snapshot_request_sender {
399            snapshot_request_sender.send(snapshot_request)
400        } else {
401            Ok(())
402        }
403    }
404}
405
406pub struct AbsRequestHandler {
407    pub snapshot_request_handler: Option<SnapshotRequestHandler>,
408    pub pruned_banks_receiver: DroppedSlotsReceiver,
409}
410
411impl AbsRequestHandler {
412    pub fn handle_snapshot_requests(
414        &self,
415        accounts_db_caching_enabled: bool,
416        test_hash_calculation: bool,
417        non_snapshot_time_us: u128,
418        last_full_snapshot_slot: &mut Option<Slot>,
419    ) -> Option<Result<u64, SnapshotError>> {
420        self.snapshot_request_handler
421            .as_ref()
422            .and_then(|snapshot_request_handler| {
423                snapshot_request_handler.handle_snapshot_requests(
424                    accounts_db_caching_enabled,
425                    test_hash_calculation,
426                    non_snapshot_time_us,
427                    last_full_snapshot_slot,
428                )
429            })
430    }
431
432    pub fn handle_pruned_banks(&self, bank: &Bank, is_serialized_with_abs: bool) -> usize {
433        let mut count = 0;
434        for (pruned_slot, pruned_bank_id) in self.pruned_banks_receiver.try_iter() {
435            count += 1;
436            bank.rc.accounts.accounts_db.purge_slot(
437                pruned_slot,
438                pruned_bank_id,
439                is_serialized_with_abs,
440            );
441        }
442
443        count
444    }
445}
446
447pub struct AccountsBackgroundService {
448    t_background: JoinHandle<()>,
449}
450
451impl AccountsBackgroundService {
452    pub fn new(
453        bank_forks: Arc<RwLock<BankForks>>,
454        exit: &Arc<AtomicBool>,
455        request_handler: AbsRequestHandler,
456        accounts_db_caching_enabled: bool,
457        test_hash_calculation: bool,
458        mut last_full_snapshot_slot: Option<Slot>,
459    ) -> Self {
460        info!("AccountsBackgroundService active");
461        let exit = exit.clone();
462        let mut consumed_budget = 0;
463        let mut last_cleaned_block_height = 0;
464        let mut removed_slots_count = 0;
465        let mut total_remove_slots_time = 0;
466        let mut last_expiration_check_time = Instant::now();
467        let t_background = Builder::new()
468            .name("solBgAccounts".to_string())
469            .spawn(move || {
470                let mut stats = StatsManager::new();
471                let mut last_snapshot_end_time = None;
472                loop {
473                    if exit.load(Ordering::Relaxed) {
474                        break;
475                    }
476                    let start_time = Instant::now();
477
478                    let bank = bank_forks.read().unwrap().root_bank().clone();
480
481                    Self::remove_dead_slots(
483                        &bank,
484                        &request_handler,
485                        &mut removed_slots_count,
486                        &mut total_remove_slots_time,
487                    );
488
489                    Self::expire_old_recycle_stores(&bank, &mut last_expiration_check_time);
490
491                    let non_snapshot_time = last_snapshot_end_time
492                        .map(|last_snapshot_end_time: Instant| {
493                            last_snapshot_end_time.elapsed().as_micros()
494                        })
495                        .unwrap_or_default();
496
497                    let snapshot_block_height_option_result = request_handler
515                        .handle_snapshot_requests(
516                            accounts_db_caching_enabled,
517                            test_hash_calculation,
518                            non_snapshot_time,
519                            &mut last_full_snapshot_slot,
520                        );
521                    if snapshot_block_height_option_result.is_some() {
522                        last_snapshot_end_time = Some(Instant::now());
523                    }
524
525                    if accounts_db_caching_enabled {
526                        bank.flush_accounts_cache_if_needed();
531                    }
532
533                    if let Some(snapshot_block_height_result) = snapshot_block_height_option_result
534                    {
535                        if let Ok(snapshot_block_height) = snapshot_block_height_result {
537                            assert!(last_cleaned_block_height <= snapshot_block_height);
538                            last_cleaned_block_height = snapshot_block_height;
539                        } else {
540                            exit.store(true, Ordering::Relaxed);
541                            return;
542                        }
543                    } else {
544                        if accounts_db_caching_enabled {
545                            bank.shrink_candidate_slots();
546                        } else {
547                            consumed_budget = bank
551                                .process_stale_slot_with_budget(
552                                    consumed_budget,
553                                    SHRUNKEN_ACCOUNT_PER_INTERVAL,
554                                )
555                                .min(SHRUNKEN_ACCOUNT_PER_INTERVAL);
556                        }
557                        if bank.block_height() - last_cleaned_block_height
558                            > (CLEAN_INTERVAL_BLOCKS + thread_rng().gen_range(0, 10))
559                        {
560                            if accounts_db_caching_enabled {
561                                bank.force_flush_accounts_cache();
566                            }
567                            bank.clean_accounts(true, false, last_full_snapshot_slot);
568                            last_cleaned_block_height = bank.block_height();
569                        }
570                    }
571                    stats.record_and_maybe_submit(start_time.elapsed());
572                    sleep(Duration::from_millis(INTERVAL_MS));
573                }
574            })
575            .unwrap();
576        Self { t_background }
577    }
578
579    pub fn setup_bank_drop_callback(bank_forks: Arc<RwLock<BankForks>>) -> DroppedSlotsReceiver {
584        assert_eq!(bank_forks.read().unwrap().banks().len(), 1);
585
586        let (pruned_banks_sender, pruned_banks_receiver) =
587            crossbeam_channel::bounded(MAX_DROP_BANK_SIGNAL_QUEUE_SIZE);
588        {
589            let root_bank = bank_forks.read().unwrap().root_bank();
590            root_bank.set_callback(Some(Box::new(
591                root_bank
592                    .rc
593                    .accounts
594                    .accounts_db
595                    .create_drop_bank_callback(pruned_banks_sender),
596            )));
597        }
598        pruned_banks_receiver
599    }
600
601    pub fn join(self) -> thread::Result<()> {
602        self.t_background.join()
603    }
604
605    fn remove_dead_slots(
606        bank: &Bank,
607        request_handler: &AbsRequestHandler,
608        removed_slots_count: &mut usize,
609        total_remove_slots_time: &mut u64,
610    ) {
611        let mut remove_slots_time = Measure::start("remove_slots_time");
612        *removed_slots_count += request_handler.handle_pruned_banks(bank, true);
613        remove_slots_time.stop();
614        *total_remove_slots_time += remove_slots_time.as_us();
615
616        if *removed_slots_count >= 100 {
617            datapoint_info!(
618                "remove_slots_timing",
619                ("remove_slots_time", *total_remove_slots_time, i64),
620                ("removed_slots_count", *removed_slots_count, i64),
621            );
622            *total_remove_slots_time = 0;
623            *removed_slots_count = 0;
624        }
625    }
626
627    fn expire_old_recycle_stores(bank: &Bank, last_expiration_check_time: &mut Instant) {
628        let now = Instant::now();
629        if now.duration_since(*last_expiration_check_time).as_secs()
630            > RECYCLE_STORE_EXPIRATION_INTERVAL_SECS
631        {
632            bank.expire_old_recycle_stores();
633            *last_expiration_check_time = now;
634        }
635    }
636}
637
638#[cfg(test)]
639mod test {
640    use {
641        super::*,
642        crate::genesis_utils::create_genesis_config,
643        crossbeam_channel::unbounded,
644        solana_sdk::{account::AccountSharedData, pubkey::Pubkey},
645    };
646
647    #[test]
648    fn test_accounts_background_service_remove_dead_slots() {
649        let genesis = create_genesis_config(10);
650        let bank0 = Arc::new(Bank::new_for_tests(&genesis.genesis_config));
651        let (pruned_banks_sender, pruned_banks_receiver) = unbounded();
652        let request_handler = AbsRequestHandler {
653            snapshot_request_handler: None,
654            pruned_banks_receiver,
655        };
656
657        let account_key = Pubkey::new_unique();
659        bank0.store_account(
660            &account_key,
661            &AccountSharedData::new(264, 0, &Pubkey::default()),
662        );
663        assert!(bank0.get_account(&account_key).is_some());
664        pruned_banks_sender.send((0, 0)).unwrap();
665
666        assert!(!bank0.rc.accounts.scan_slot(0, |_| Some(())).is_empty());
667
668        AccountsBackgroundService::remove_dead_slots(&bank0, &request_handler, &mut 0, &mut 0);
669
670        assert!(bank0.rc.accounts.scan_slot(0, |_| Some(())).is_empty());
671    }
672}