use {
super::*,
solana_message::AccountKeys,
std::{cmp::max, time::Instant},
};
#[derive(Default)]
pub struct PurgeStats {
delete_range: u64,
write_batch: u64,
delete_file_in_range: u64,
}
#[derive(Clone, Copy)]
pub enum PurgeType {
Exact,
CompactionFilter,
}
impl Blockstore {
pub fn purge_slots(&self, from_slot: Slot, to_slot: Slot, purge_type: PurgeType) {
let mut purge_stats = PurgeStats::default();
let purge_result =
self.run_purge_with_stats(from_slot, to_slot, purge_type, &mut purge_stats);
datapoint_info!(
"blockstore-purge",
("from_slot", from_slot as i64, i64),
("to_slot", to_slot as i64, i64),
("delete_range_us", purge_stats.delete_range as i64, i64),
("write_batch_us", purge_stats.write_batch as i64, i64),
(
"delete_file_in_range_us",
purge_stats.delete_file_in_range as i64,
i64
)
);
if let Err(e) = purge_result {
error!("Error: {e:?}; Purge failed in range {from_slot:?} to {to_slot:?}");
}
}
pub fn set_max_expired_slot(&self, to_slot: Slot) {
let to_slot = to_slot.checked_add(1).unwrap();
self.db.set_oldest_slot(to_slot);
if let Err(err) = self.maybe_cleanup_highest_primary_index_slot(to_slot) {
warn!("Could not clean up TransactionStatusIndex: {err:?}");
}
}
pub fn purge_and_compact_slots(&self, from_slot: Slot, to_slot: Slot) {
self.purge_slots(from_slot, to_slot, PurgeType::Exact);
}
pub fn purge_from_next_slots(&self, from_slot: Slot, to_slot: Slot) {
let mut count = 0;
let mut rewritten = 0;
let mut last_print = Instant::now();
let mut total_retain_us = 0;
for (slot, mut meta) in self
.slot_meta_iterator(0)
.expect("unable to iterate over meta")
{
if slot > to_slot {
break;
}
count += 1;
if last_print.elapsed().as_millis() > 2000 {
info!(
"purged: {count} slots rewritten: {rewritten} retain_time: {total_retain_us}us"
);
count = 0;
rewritten = 0;
total_retain_us = 0;
last_print = Instant::now();
}
let mut time = Measure::start("retain");
let original_len = meta.next_slots.len();
meta.next_slots
.retain(|slot| *slot < from_slot || *slot > to_slot);
if meta.next_slots.len() != original_len {
rewritten += 1;
info!(
"purge_from_next_slots: meta for slot {} no longer refers to slots {:?}",
slot,
from_slot..=to_slot
);
self.put_meta(slot, &meta).expect("couldn't update meta");
}
time.stop();
total_retain_us += time.as_us();
}
}
#[cfg(test)]
pub(crate) fn run_purge(
&self,
from_slot: Slot,
to_slot: Slot,
purge_type: PurgeType,
) -> Result<bool> {
self.run_purge_with_stats(from_slot, to_slot, purge_type, &mut PurgeStats::default())
}
pub(crate) fn purge_slot_cleanup_chaining(&self, slot: Slot) -> Result<bool> {
let Some(mut slot_meta) = self.meta(slot)? else {
return Err(BlockstoreError::SlotUnavailable);
};
let mut write_batch = self.get_write_batch()?;
let columns_purged = self.purge_range(&mut write_batch, slot, slot, PurgeType::Exact)?;
if let Some(parent_slot) = slot_meta.parent_slot {
let parent_slot_meta = self.meta(parent_slot)?;
if let Some(mut parent_slot_meta) = parent_slot_meta {
parent_slot_meta
.next_slots
.retain(|&next_slot| next_slot != slot);
self.meta_cf
.put_in_batch(&mut write_batch, parent_slot, &parent_slot_meta)?;
} else {
error!(
"Parent slot meta {parent_slot} for child {slot} is missing or cleaned up. \
Falling back to orphan repair to remedy the situation",
);
}
}
slot_meta.clear_unconfirmed_slot();
self.meta_cf
.put_in_batch(&mut write_batch, slot, &slot_meta)?;
self.write_batch(write_batch).inspect_err(|e| {
error!("Error: {e:?} while submitting write batch for slot {slot:?}")
})?;
Ok(columns_purged)
}
pub(crate) fn run_purge_with_stats(
&self,
from_slot: Slot,
to_slot: Slot,
purge_type: PurgeType,
purge_stats: &mut PurgeStats,
) -> Result<bool> {
let mut write_batch = self.get_write_batch()?;
let mut delete_range_timer = Measure::start("delete_range");
let columns_purged = self.purge_range(&mut write_batch, from_slot, to_slot, purge_type)?;
delete_range_timer.stop();
let mut write_timer = Measure::start("write_batch");
self.write_batch(write_batch).inspect_err(|e| {
error!(
"Error: {e:?} while submitting write batch for purge from_slot {from_slot} \
to_slot {to_slot}"
)
})?;
write_timer.stop();
let mut purge_files_in_range_timer = Measure::start("delete_file_in_range");
if columns_purged && from_slot == 0 {
self.purge_files_in_range(from_slot, to_slot);
}
purge_files_in_range_timer.stop();
purge_stats.delete_range += delete_range_timer.as_us();
purge_stats.write_batch += write_timer.as_us();
purge_stats.delete_file_in_range += purge_files_in_range_timer.as_us();
Ok(columns_purged)
}
fn purge_range(
&self,
write_batch: &mut WriteBatch,
from_slot: Slot,
to_slot: Slot,
purge_type: PurgeType,
) -> Result<bool> {
let columns_purged = self
.meta_cf
.delete_range_in_batch(write_batch, from_slot, to_slot)
.is_ok()
& self
.bank_hash_cf
.delete_range_in_batch(write_batch, from_slot, to_slot)
.is_ok()
& self
.roots_cf
.delete_range_in_batch(write_batch, from_slot, to_slot)
.is_ok()
& self
.data_shred_cf
.delete_range_in_batch(write_batch, from_slot, to_slot)
.is_ok()
& self
.code_shred_cf
.delete_range_in_batch(write_batch, from_slot, to_slot)
.is_ok()
& self
.dead_slots_cf
.delete_range_in_batch(write_batch, from_slot, to_slot)
.is_ok()
& self
.duplicate_slots_cf
.delete_range_in_batch(write_batch, from_slot, to_slot)
.is_ok()
& self
.erasure_meta_cf
.delete_range_in_batch(write_batch, from_slot, to_slot)
.is_ok()
& self
.orphans_cf
.delete_range_in_batch(write_batch, from_slot, to_slot)
.is_ok()
& self
.index_cf
.delete_range_in_batch(write_batch, from_slot, to_slot)
.is_ok()
& self
.rewards_cf
.delete_range_in_batch(write_batch, from_slot, to_slot)
.is_ok()
& self
.blocktime_cf
.delete_range_in_batch(write_batch, from_slot, to_slot)
.is_ok()
& self
.perf_samples_cf
.delete_range_in_batch(write_batch, from_slot, to_slot)
.is_ok()
& self
.block_height_cf
.delete_range_in_batch(write_batch, from_slot, to_slot)
.is_ok()
& self
.optimistic_slots_cf
.delete_range_in_batch(write_batch, from_slot, to_slot)
.is_ok()
& self
.merkle_root_meta_cf
.delete_range_in_batch(write_batch, from_slot, to_slot)
.is_ok();
match purge_type {
PurgeType::Exact => {
self.purge_special_columns_exact(write_batch, from_slot, to_slot)?;
}
PurgeType::CompactionFilter => {
}
}
Ok(columns_purged)
}
fn purge_files_in_range(&self, from_slot: Slot, to_slot: Slot) -> bool {
self.meta_cf
.delete_file_in_range(from_slot, to_slot)
.is_ok()
& self
.bank_hash_cf
.delete_file_in_range(from_slot, to_slot)
.is_ok()
& self
.roots_cf
.delete_file_in_range(from_slot, to_slot)
.is_ok()
& self
.data_shred_cf
.delete_file_in_range(from_slot, to_slot)
.is_ok()
& self
.code_shred_cf
.delete_file_in_range(from_slot, to_slot)
.is_ok()
& self
.dead_slots_cf
.delete_file_in_range(from_slot, to_slot)
.is_ok()
& self
.duplicate_slots_cf
.delete_file_in_range(from_slot, to_slot)
.is_ok()
& self
.erasure_meta_cf
.delete_file_in_range(from_slot, to_slot)
.is_ok()
& self
.orphans_cf
.delete_file_in_range(from_slot, to_slot)
.is_ok()
& self
.index_cf
.delete_file_in_range(from_slot, to_slot)
.is_ok()
& self
.rewards_cf
.delete_file_in_range(from_slot, to_slot)
.is_ok()
& self
.blocktime_cf
.delete_file_in_range(from_slot, to_slot)
.is_ok()
& self
.perf_samples_cf
.delete_file_in_range(from_slot, to_slot)
.is_ok()
& self
.block_height_cf
.delete_file_in_range(from_slot, to_slot)
.is_ok()
& self
.optimistic_slots_cf
.delete_file_in_range(from_slot, to_slot)
.is_ok()
& self
.merkle_root_meta_cf
.delete_file_in_range(from_slot, to_slot)
.is_ok()
}
fn special_columns_empty(&self) -> Result<bool> {
let transaction_status_empty = self
.transaction_status_cf
.iter(IteratorMode::Start)?
.next()
.is_none();
let address_signatures_empty = self
.address_signatures_cf
.iter(IteratorMode::Start)?
.next()
.is_none();
Ok(transaction_status_empty && address_signatures_empty)
}
fn purge_special_columns_exact(
&self,
batch: &mut WriteBatch,
from_slot: Slot,
to_slot: Slot,
) -> Result<()> {
if self.special_columns_empty()? {
return Ok(());
}
let mut index0 = self.transaction_status_index_cf.get(0)?.unwrap_or_default();
let mut index1 = self.transaction_status_index_cf.get(1)?.unwrap_or_default();
let highest_primary_index_slot = self.get_highest_primary_index_slot();
let slot_indexes = |slot: Slot| -> Vec<u64> {
let mut indexes = vec![];
if highest_primary_index_slot.is_none() {
return indexes;
}
if slot <= index0.max_slot && (index0.frozen || slot >= index1.max_slot) {
indexes.push(0);
}
if slot <= index1.max_slot && (index1.frozen || slot >= index0.max_slot) {
indexes.push(1);
}
indexes
};
for slot in from_slot..=to_slot {
let primary_indexes = slot_indexes(slot);
let (slot_entries, _, _) =
self.get_slot_entries_with_shred_info(slot, 0, true )?;
let transactions = slot_entries
.into_iter()
.flat_map(|entry| entry.transactions);
for (i, transaction) in transactions.enumerate() {
if let Some(&signature) = transaction.signatures.first() {
self.transaction_status_cf
.delete_in_batch(batch, (signature, slot))?;
self.transaction_memos_cf
.delete_in_batch(batch, (signature, slot))?;
if !primary_indexes.is_empty() {
self.transaction_memos_cf
.delete_deprecated_in_batch(batch, signature)?;
}
for primary_index in &primary_indexes {
self.transaction_status_cf
.delete_deprecated_in_batch(batch, (*primary_index, signature, slot))?;
}
let meta = self.read_transaction_status((signature, slot))?;
let loaded_addresses = meta.map(|meta| meta.loaded_addresses);
let account_keys = AccountKeys::new(
transaction.message.static_account_keys(),
loaded_addresses.as_ref(),
);
let transaction_index =
u32::try_from(i).map_err(|_| BlockstoreError::TransactionIndexOverflow)?;
for pubkey in account_keys.iter() {
self.address_signatures_cf.delete_in_batch(
batch,
(*pubkey, slot, transaction_index, signature),
)?;
for primary_index in &primary_indexes {
self.address_signatures_cf.delete_deprecated_in_batch(
batch,
(*primary_index, *pubkey, slot, signature),
)?;
}
}
}
}
}
let mut update_highest_primary_index_slot = false;
if index0.max_slot >= from_slot && index0.max_slot <= to_slot {
index0.max_slot = from_slot.saturating_sub(1);
self.transaction_status_index_cf
.put_in_batch(batch, 0, &index0)?;
update_highest_primary_index_slot = true;
}
if index1.max_slot >= from_slot && index1.max_slot <= to_slot {
index1.max_slot = from_slot.saturating_sub(1);
self.transaction_status_index_cf
.put_in_batch(batch, 1, &index1)?;
update_highest_primary_index_slot = true
}
if update_highest_primary_index_slot {
self.set_highest_primary_index_slot(Some(max(index0.max_slot, index1.max_slot)))
}
Ok(())
}
}
#[cfg(test)]
pub mod tests {
use {
super::*,
crate::{
blockstore::tests::make_slot_entries_with_transactions, get_tmp_ledger_path_auto_delete,
},
bincode::serialize,
solana_entry::entry::next_entry_mut,
solana_hash::Hash,
solana_message::Message,
solana_sha256_hasher::hash,
solana_transaction::Transaction,
test_case::test_case,
};
#[test]
fn test_purge_slots() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
let (shreds, _) = make_many_slot_entries(0, 50, 5);
blockstore.insert_shreds(shreds, None, false).unwrap();
blockstore.purge_and_compact_slots(0, 5);
test_all_empty_or_min(&blockstore, 6);
blockstore.purge_and_compact_slots(0, 50);
test_all_empty_or_min(&blockstore, 100);
test_all_empty_or_min(&blockstore, 0);
blockstore
.slot_meta_iterator(0)
.unwrap()
.for_each(|(_, _)| {
panic!();
});
}
#[test]
fn test_purge_front_of_ledger() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
let max_slot = 10;
for x in 0..max_slot {
let random_bytes: [u8; 64] = std::array::from_fn(|_| rand::random::<u8>());
blockstore
.write_transaction_status(
x,
Signature::from(random_bytes),
vec![
(&Pubkey::try_from(&random_bytes[..32]).unwrap(), true),
(&Pubkey::try_from(&random_bytes[32..]).unwrap(), false),
]
.into_iter(),
TransactionStatusMeta::default(),
0,
)
.unwrap();
}
blockstore.run_purge(10, 20, PurgeType::Exact).unwrap();
let status_entries: Vec<_> = blockstore
.transaction_status_cf
.iter(IteratorMode::Start)
.unwrap()
.collect();
assert_eq!(status_entries.len(), 10);
}
fn clear_and_repopulate_transaction_statuses_for_test(blockstore: &Blockstore, max_slot: u64) {
blockstore.run_purge(0, max_slot, PurgeType::Exact).unwrap();
let mut iter = blockstore
.transaction_status_cf
.iter(IteratorMode::Start)
.unwrap();
assert_eq!(iter.next(), None);
populate_transaction_statuses_for_test(blockstore, 0, max_slot);
}
fn populate_transaction_statuses_for_test(
blockstore: &Blockstore,
min_slot: u64,
max_slot: u64,
) {
for x in min_slot..=max_slot {
let entries = make_slot_entries_with_transactions(1);
let shreds = entries_to_test_shreds(
&entries,
x, x.saturating_sub(1), true, 0, );
blockstore.insert_shreds(shreds, None, false).unwrap();
let signature = entries
.iter()
.filter(|entry| !entry.is_tick())
.cloned()
.flat_map(|entry| entry.transactions)
.map(|transaction| transaction.signatures[0])
.collect::<Vec<Signature>>()[0];
let random_bytes: Vec<u8> = (0..64).map(|_| rand::random::<u8>()).collect();
blockstore
.write_transaction_status(
x,
signature,
vec![
(&Pubkey::try_from(&random_bytes[..32]).unwrap(), true),
(&Pubkey::try_from(&random_bytes[32..]).unwrap(), false),
]
.into_iter(),
TransactionStatusMeta::default(),
0,
)
.unwrap();
}
}
fn populate_deprecated_transaction_statuses_for_test(
blockstore: &Blockstore,
primary_index: u64,
min_slot: u64,
max_slot: u64,
) {
for x in min_slot..=max_slot {
let entries = make_slot_entries_with_transactions(1);
let shreds = entries_to_test_shreds(
&entries,
x, x.saturating_sub(1), true, 0, );
blockstore.insert_shreds(shreds, None, false).unwrap();
let signature = entries
.iter()
.filter(|entry| !entry.is_tick())
.cloned()
.flat_map(|entry| entry.transactions)
.map(|transaction| transaction.signatures[0])
.collect::<Vec<Signature>>()[0];
let random_bytes: Vec<u8> = (0..64).map(|_| rand::random::<u8>()).collect();
blockstore
.write_deprecated_transaction_status(
primary_index,
x,
signature,
vec![&Pubkey::try_from(&random_bytes[..32]).unwrap()],
vec![&Pubkey::try_from(&random_bytes[32..]).unwrap()],
TransactionStatusMeta::default(),
)
.unwrap();
}
}
#[test]
fn test_special_columns_empty() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
assert!(blockstore.special_columns_empty().unwrap());
let num_entries = 1;
let max_slot = 9;
for slot in 0..=max_slot {
let entries = make_slot_entries_with_transactions(num_entries);
let shreds = entries_to_test_shreds(
&entries,
slot,
slot.saturating_sub(1),
true, 0, );
blockstore.insert_shreds(shreds, None, false).unwrap();
for transaction in entries.into_iter().flat_map(|entry| entry.transactions) {
assert_eq!(transaction.signatures.len(), 1);
blockstore
.write_transaction_status(
slot,
transaction.signatures[0],
transaction
.message
.static_account_keys()
.iter()
.map(|key| (key, true)),
TransactionStatusMeta::default(),
0,
)
.unwrap();
}
}
assert!(!blockstore.special_columns_empty().unwrap());
blockstore
.run_purge(0, max_slot - 5, PurgeType::Exact)
.unwrap();
assert!(!blockstore.special_columns_empty().unwrap());
blockstore.run_purge(0, max_slot, PurgeType::Exact).unwrap();
assert!(blockstore.special_columns_empty().unwrap());
}
#[test]
#[allow(clippy::cognitive_complexity)]
fn test_purge_transaction_status_exact() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
let max_slot = 9;
clear_and_repopulate_transaction_statuses_for_test(&blockstore, max_slot);
blockstore.run_purge(10, 12, PurgeType::Exact).unwrap();
let mut status_entry_iterator = blockstore
.transaction_status_cf
.iter(IteratorMode::Start)
.unwrap();
for _ in 0..max_slot + 1 {
let entry = status_entry_iterator.next().unwrap().0;
assert!(entry.1 <= max_slot || entry.1 > 0);
}
assert_eq!(status_entry_iterator.next(), None);
drop(status_entry_iterator);
clear_and_repopulate_transaction_statuses_for_test(&blockstore, max_slot);
blockstore.run_purge(2, 4, PurgeType::Exact).unwrap();
let mut status_entry_iterator = blockstore
.transaction_status_cf
.iter(IteratorMode::Start)
.unwrap();
for _ in 0..7 {
let entry = status_entry_iterator.next().unwrap().0;
assert!(entry.1 < 2 || entry.1 > 4);
}
assert_eq!(status_entry_iterator.next(), None);
drop(status_entry_iterator);
clear_and_repopulate_transaction_statuses_for_test(&blockstore, max_slot);
blockstore
.run_purge(0, max_slot - 1, PurgeType::Exact)
.unwrap();
let mut status_entry_iterator = blockstore
.transaction_status_cf
.iter(IteratorMode::Start)
.unwrap();
let entry = status_entry_iterator.next().unwrap().0;
assert_eq!(entry.1, 9);
assert_eq!(status_entry_iterator.next(), None);
drop(status_entry_iterator);
clear_and_repopulate_transaction_statuses_for_test(&blockstore, max_slot);
blockstore.run_purge(0, 22, PurgeType::Exact).unwrap();
let mut status_entry_iterator = blockstore
.transaction_status_cf
.iter(IteratorMode::Start)
.unwrap();
assert_eq!(status_entry_iterator.next(), None);
}
fn purge_exact(blockstore: &Blockstore, oldest_slot: Slot) {
blockstore
.run_purge(0, oldest_slot - 1, PurgeType::Exact)
.unwrap();
}
fn purge_compaction_filter(blockstore: &Blockstore, oldest_slot: Slot) {
blockstore.db.set_oldest_slot(oldest_slot);
blockstore.transaction_status_cf.compact();
}
#[test_case(purge_exact; "exact")]
#[test_case(purge_compaction_filter; "compaction_filter")]
fn test_purge_special_columns_with_old_data(purge: impl Fn(&Blockstore, Slot)) {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
populate_deprecated_transaction_statuses_for_test(&blockstore, 0, 0, 4);
populate_deprecated_transaction_statuses_for_test(&blockstore, 1, 5, 9);
populate_transaction_statuses_for_test(&blockstore, 10, 14);
let mut index0 = blockstore
.transaction_status_index_cf
.get(0)
.unwrap()
.unwrap_or_default();
index0.frozen = true;
index0.max_slot = 4;
blockstore
.transaction_status_index_cf
.put(0, &index0)
.unwrap();
let mut index1 = blockstore
.transaction_status_index_cf
.get(1)
.unwrap()
.unwrap_or_default();
index1.frozen = false;
index1.max_slot = 9;
blockstore
.transaction_status_index_cf
.put(1, &index1)
.unwrap();
let num_statuses = blockstore
.transaction_status_cf
.iter(IteratorMode::Start)
.unwrap()
.count();
assert_eq!(num_statuses, 15);
let oldest_slot = 3;
purge(&blockstore, oldest_slot);
let status_entry_iterator = blockstore
.transaction_status_cf
.iter(IteratorMode::Start)
.unwrap();
let mut count = 0;
for ((_signature, slot), _value) in status_entry_iterator {
assert!(slot >= oldest_slot);
count += 1;
}
assert_eq!(count, 12);
let oldest_slot = 5;
purge(&blockstore, oldest_slot);
let status_entry_iterator = blockstore
.transaction_status_cf
.iter(IteratorMode::Start)
.unwrap();
let mut count = 0;
for ((_signature, slot), _value) in status_entry_iterator {
assert!(slot >= oldest_slot);
count += 1;
}
assert_eq!(count, 10);
let oldest_slot = 8;
purge(&blockstore, oldest_slot);
let status_entry_iterator = blockstore
.transaction_status_cf
.iter(IteratorMode::Start)
.unwrap();
let mut count = 0;
for ((_signature, slot), _value) in status_entry_iterator {
assert!(slot >= oldest_slot);
count += 1;
}
assert_eq!(count, 7);
let oldest_slot = 10;
purge(&blockstore, oldest_slot);
let status_entry_iterator = blockstore
.transaction_status_cf
.iter(IteratorMode::Start)
.unwrap();
let mut count = 0;
for ((_signature, slot), _value) in status_entry_iterator {
assert!(slot >= oldest_slot);
count += 1;
}
assert_eq!(count, 5);
let oldest_slot = 13;
purge(&blockstore, oldest_slot);
let status_entry_iterator = blockstore
.transaction_status_cf
.iter(IteratorMode::Start)
.unwrap();
let mut count = 0;
for ((_signature, slot), _value) in status_entry_iterator {
assert!(slot >= oldest_slot);
count += 1;
}
assert_eq!(count, 2);
let oldest_slot = 20;
purge(&blockstore, oldest_slot);
let mut status_entry_iterator = blockstore
.transaction_status_cf
.iter(IteratorMode::Start)
.unwrap();
assert!(status_entry_iterator.next().is_none());
}
#[test]
fn test_purge_special_columns_exact_no_sigs() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
let slot = 1;
let mut entries: Vec<Entry> = vec![];
for x in 0..5 {
let mut tx = Transaction::new_unsigned(Message::default());
tx.signatures = vec![];
entries.push(next_entry_mut(&mut Hash::default(), 0, vec![tx]));
let mut tick = create_ticks(1, 0, hash(&serialize(&x).unwrap()));
entries.append(&mut tick);
}
let shreds = entries_to_test_shreds(
&entries,
slot,
slot - 1, true, 0, );
blockstore.insert_shreds(shreds, None, false).unwrap();
let mut write_batch = blockstore.get_write_batch().unwrap();
blockstore
.purge_special_columns_exact(&mut write_batch, slot, slot + 1)
.unwrap();
}
#[test]
fn test_purge_special_columns_compaction_filter() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
let max_slot = 19;
clear_and_repopulate_transaction_statuses_for_test(&blockstore, max_slot);
let oldest_slot = 3;
blockstore.db.set_oldest_slot(oldest_slot);
blockstore.transaction_status_cf.compact();
let status_entry_iterator = blockstore
.transaction_status_cf
.iter(IteratorMode::Start)
.unwrap();
let mut count = 0;
for ((_signature, slot), _value) in status_entry_iterator {
assert!(slot >= oldest_slot);
count += 1;
}
assert_eq!(count, max_slot - (oldest_slot - 1));
clear_and_repopulate_transaction_statuses_for_test(&blockstore, max_slot);
let oldest_slot = 12;
blockstore.db.set_oldest_slot(oldest_slot);
blockstore.transaction_status_cf.compact();
let status_entry_iterator = blockstore
.transaction_status_cf
.iter(IteratorMode::Start)
.unwrap();
let mut count = 0;
for ((_signature, slot), _value) in status_entry_iterator {
assert!(slot >= oldest_slot);
count += 1;
}
assert_eq!(count, max_slot - (oldest_slot - 1));
}
#[test]
fn test_purge_transaction_memos_compaction_filter() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
let oldest_slot = 5;
fn random_signature() -> Signature {
use rand::Rng;
let mut key = [0u8; 64];
rand::thread_rng().fill(&mut key[..]);
Signature::from(key)
}
blockstore
.transaction_memos_cf
.put_deprecated(random_signature(), &"this is a memo".to_string())
.unwrap();
blockstore
.transaction_memos_cf
.put_deprecated(random_signature(), &"another memo".to_string())
.unwrap();
blockstore.db.set_clean_slot_0(false);
blockstore
.transaction_memos_cf
.put(
(random_signature(), oldest_slot - 1),
&"this is a new memo in slot 4".to_string(),
)
.unwrap();
blockstore
.transaction_memos_cf
.put(
(random_signature(), oldest_slot),
&"this is a memo in slot 5 ".to_string(),
)
.unwrap();
blockstore.db.set_oldest_slot(0);
blockstore.transaction_memos_cf.compact();
let num_memos = blockstore
.transaction_memos_cf
.iter(IteratorMode::Start)
.unwrap()
.count();
assert_eq!(num_memos, 4);
blockstore.db.set_oldest_slot(oldest_slot);
blockstore.transaction_memos_cf.compact();
let memos_iterator = blockstore
.transaction_memos_cf
.iter(IteratorMode::Start)
.unwrap();
let mut count = 0;
for ((_signature, slot), _value) in memos_iterator {
assert!(slot == 0 || slot >= oldest_slot);
count += 1;
}
assert_eq!(count, 3);
blockstore.db.set_clean_slot_0(true);
blockstore.transaction_memos_cf.compact();
let memos_iterator = blockstore
.transaction_memos_cf
.iter(IteratorMode::Start)
.unwrap();
let mut count = 0;
for ((_signature, slot), _value) in memos_iterator {
assert!(slot >= oldest_slot);
count += 1;
}
assert_eq!(count, 1);
}
#[test]
fn test_purge_slot_cleanup_chaining_missing_slot_meta() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
let (shreds, _) = make_many_slot_entries(0, 10, 5);
blockstore.insert_shreds(shreds, None, false).unwrap();
assert!(matches!(
blockstore.purge_slot_cleanup_chaining(11).unwrap_err(),
BlockstoreError::SlotUnavailable
));
}
#[test]
fn test_purge_slot_cleanup_chaining() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
let (shreds, _) = make_many_slot_entries(0, 10, 5);
blockstore.insert_shreds(shreds, None, false).unwrap();
let (slot_11, _) = make_slot_entries(11, 4, 5);
blockstore.insert_shreds(slot_11, None, false).unwrap();
let (slot_12, _) = make_slot_entries(12, 5, 5);
blockstore.insert_shreds(slot_12, None, false).unwrap();
blockstore.purge_slot_cleanup_chaining(5).unwrap();
let slot_meta = blockstore.meta(5).unwrap().unwrap();
let expected_slot_meta = SlotMeta {
slot: 5,
next_slots: vec![6, 12],
..SlotMeta::default()
};
assert_eq!(slot_meta, expected_slot_meta);
let parent_slot_meta = blockstore.meta(4).unwrap().unwrap();
assert_eq!(parent_slot_meta.next_slots, vec![11]);
let child_slot_meta = blockstore.meta(6).unwrap().unwrap();
assert_eq!(child_slot_meta.parent_slot.unwrap(), 5);
let child_slot_meta = blockstore.meta(12).unwrap().unwrap();
assert_eq!(child_slot_meta.parent_slot.unwrap(), 5);
}
}