use {
crate::{
ContentHash,
Ident,
database::{
Database,
GenerationEpoch,
HasPartition,
PartitionKey,
PartitionStore,
RecordHandle,
chunk::RecordWriter,
partitions::RefcountOps,
reaper::{DeferredDecrement, Reaper, RefCountIncrementer, RefCountDecrementer},
storage::Partitions,
},
record::References,
},
std::sync::Arc,
super::storage::{
TestPartition,
Test1Partition,
TestPartitions,
TestRecordData,
},
};
#[test]
fn test_refcount_increment_on_index_insert() {
let db: Database<TestPartitions> = Database::new();
let source_cache = crate::source::cache::reporter::SourceCacheReader::new_empty_for_test();
let record = TestRecordData::Function {
params: vec![Ident::new("x")],
return_type: "int".to_string(),
};
let hash = crate::record::Record::content_hash(&record);
let mut writer = RecordWriter::<TestPartitions>::new(
Ident::new("task1"),
);
writer.insert::<TestPartition, _>("func::main".to_string(), record);
let chunk = writer.build();
let result = db.commit_chunk(chunk, &source_cache);
assert!(result.new_hashes.iter().any(|r| r.hash == hash));
assert!(result.deferred_decrements.is_empty());
let refcount = db.cas.get_refcount(TestPartition::KEY, hash);
assert_eq!(refcount, 1);
}
#[test]
fn test_refcount_decrement_on_index_overwrite() {
let db: Database<TestPartitions> = Database::new();
let source_cache = crate::source::cache::reporter::SourceCacheReader::new_empty_for_test();
let record1 = TestRecordData::Function {
params: vec![Ident::new("x")],
return_type: "int".to_string(),
};
let hash1 = crate::record::Record::content_hash(&record1);
let mut writer1 = RecordWriter::<TestPartitions>::new(
Ident::new("task1"),
);
writer1.insert::<TestPartition, _>("func::main".to_string(), record1);
let chunk1 = writer1.build();
let _ = db.commit_chunk(chunk1, &source_cache);
let record2 = TestRecordData::Function {
params: vec![Ident::new("y")],
return_type: "string".to_string(),
};
let hash2 = crate::record::Record::content_hash(&record2);
let mut writer2 = RecordWriter::<TestPartitions>::new(
Ident::new("task2"),
);
writer2.insert::<TestPartition, _>("func::main".to_string(), record2);
let chunk2 = writer2.build();
let result = db.commit_chunk(chunk2, &source_cache);
assert_eq!(result.deferred_decrements.len(), 1);
assert_eq!(result.deferred_decrements[0].hash, hash1);
let refcount2 = db.cas.get_refcount(TestPartition::KEY, hash2);
assert_eq!(refcount2, 1);
let refcount1 = db.cas.get_refcount(TestPartition::KEY, hash1);
assert_eq!(refcount1, 1);
}
#[test]
fn test_scoped_deletion_queues_decrements() {
let db: Database<TestPartitions> = Database::new();
let source_cache = crate::source::cache::reporter::SourceCacheReader::new_empty_for_test();
let record_a1 = TestRecordData::Function {
params: vec![],
return_type: "void".to_string(),
};
let hash_a1 = crate::record::Record::content_hash(&record_a1);
let record_a2 = TestRecordData::Function {
params: vec![Ident::new("x")],
return_type: "int".to_string(),
};
let hash_a2 = crate::record::Record::content_hash(&record_a2);
let record_b1 = TestRecordData::Function {
params: vec![],
return_type: "bool".to_string(),
};
let hash_b1 = crate::record::Record::content_hash(&record_b1);
let mut writer = RecordWriter::<TestPartitions>::new(
Ident::new("task1"),
);
writer.insert::<TestPartition, _>("file::a::func1".to_string(), record_a1);
writer.insert::<TestPartition, _>("file::a::func2".to_string(), record_a2);
writer.insert::<TestPartition, _>("file::b::func1".to_string(), record_b1);
let chunk = writer.build();
let _ = db.commit_chunk(chunk, &source_cache);
let new_record = TestRecordData::Function {
params: vec![Ident::new("new")],
return_type: "new".to_string(),
};
let mut writer2 = RecordWriter::<TestPartitions>::new(
Ident::new("task2"),
);
writer2.clear_prefix(TestPartition::KEY, "file::a::");
writer2.insert::<TestPartition, _>("file::a::newfunc".to_string(), new_record);
let chunk2 = writer2.build();
let result = db.commit_chunk(chunk2, &source_cache);
assert_eq!(result.deferred_decrements.len(), 2, "Should have exactly 2 deferred decrements");
let deferred_hashes: std::collections::HashSet<_> = result.deferred_decrements.iter().map(|d| d.hash).collect();
assert!(
deferred_hashes.contains(&hash_a1),
"file::a::func1 hash should be queued for decrement"
);
assert!(
deferred_hashes.contains(&hash_a2),
"file::a::func2 hash should be queued for decrement"
);
assert!(
!deferred_hashes.contains(&hash_b1),
"file::b::func1 hash should NOT be queued (outside scope)"
);
for dd in &result.deferred_decrements {
assert_eq!(
dd.partition, TestPartition::KEY,
"All deferred decrements should be for TestPartition"
);
}
assert_eq!(
db.cas.get_refcount(TestPartition::KEY, hash_b1),
1,
"file::b::func1 should still have refcount 1"
);
}
#[test]
fn test_reaper_processes_decrements() {
let stores = Arc::new(TestPartitions::new_stores());
let reaper = Reaper::<TestPartitions>::new(Arc::clone(&stores));
let hash = ContentHash::new(&[10, 20, 30]);
stores.increment_refcount(TestPartition::KEY, hash);
stores.increment_refcount(TestPartition::KEY, hash);
assert_eq!(stores.get_refcount(TestPartition::KEY, hash), 2);
reaper.queue_decrement(
TestPartition::KEY,
hash,
GenerationEpoch::new(0),
);
reaper.queue_decrement(
TestPartition::KEY,
hash,
GenerationEpoch::new(1),
);
assert_eq!(reaper.queue_len(), 2);
let removed = reaper.reap(GenerationEpoch::new(0), usize::MAX);
assert_eq!(removed, 0);
assert_eq!(reaper.queue_len(), 2);
let removed = reaper.reap(GenerationEpoch::new(1), usize::MAX);
assert_eq!(removed, 0); assert_eq!(reaper.queue_len(), 1);
assert_eq!(stores.get_refcount(TestPartition::KEY, hash), 1);
let removed = reaper.reap(GenerationEpoch::new(2), usize::MAX);
assert_eq!(removed, 1); assert!(reaper.is_empty());
assert_eq!(stores.get_refcount(TestPartition::KEY, hash), 0);
}
#[test]
fn test_reinsert_same_key_hash_increments_without_decrement() {
let db: Database<TestPartitions> = Database::new();
let source_cache = crate::source::cache::reporter::SourceCacheReader::new_empty_for_test();
let record = TestRecordData::Struct {
fields: vec![(Ident::new("x"), "int".to_string())],
};
let hash = crate::record::Record::content_hash(&record);
let mut writer1 = RecordWriter::<TestPartitions>::new(
Ident::new("task1"),
);
writer1.insert::<TestPartition, _>("struct::Point".to_string(), record.clone());
let chunk1 = writer1.build();
let _ = db.commit_chunk(chunk1, &source_cache);
let refcount1 = db.cas.get_refcount(TestPartition::KEY, hash);
assert_eq!(refcount1, 1);
let mut writer2 = RecordWriter::<TestPartitions>::new(
Ident::new("task2"),
);
writer2.insert::<TestPartition, _>("struct::Point".to_string(), record);
let chunk2 = writer2.build();
let result = db.commit_chunk(chunk2, &source_cache);
assert!(result.deferred_decrements.is_empty());
let refcount2 = db.cas.get_refcount(TestPartition::KEY, hash);
assert_eq!(refcount2, 2);
}
#[test]
fn test_multiple_index_keys_same_hash() {
let db: Database<TestPartitions> = Database::new();
let source_cache = crate::source::cache::reporter::SourceCacheReader::new_empty_for_test();
let record = TestRecordData::Module {
exports: vec![Ident::new("shared")],
};
let hash = crate::record::Record::content_hash(&record);
let mut writer1 = RecordWriter::<TestPartitions>::new(
Ident::new("task1"),
);
writer1.insert::<TestPartition, _>("module::A".to_string(), record.clone());
let chunk1 = writer1.build();
let _ = db.commit_chunk(chunk1, &source_cache);
let refcount1 = db.cas.get_refcount(TestPartition::KEY, hash);
assert_eq!(refcount1, 1, "First index entry should give refcount 1");
let mut writer2 = RecordWriter::<TestPartitions>::new(
Ident::new("task2"),
);
writer2.insert::<TestPartition, _>("module::B".to_string(), record);
let chunk2 = writer2.build();
let result = db.commit_chunk(chunk2, &source_cache);
assert!(result.deferred_decrements.is_empty(), "New key should not queue decrements");
let refcount2 = db.cas.get_refcount(TestPartition::KEY, hash);
assert_eq!(refcount2, 2, "Two index entries should give refcount 2");
let different_record = TestRecordData::Module {
exports: vec![Ident::new("different")],
};
let different_hash = crate::record::Record::content_hash(&different_record);
let mut writer3 = RecordWriter::<TestPartitions>::new(
Ident::new("task3"),
);
writer3.insert::<TestPartition, _>("module::A".to_string(), different_record);
let chunk3 = writer3.build();
let result = db.commit_chunk(chunk3, &source_cache);
assert_eq!(result.deferred_decrements.len(), 1, "Overwriting should queue 1 decrement");
assert_eq!(result.deferred_decrements[0].hash, hash, "Decrement should be for original hash");
let refcount3 = db.cas.get_refcount(TestPartition::KEY, hash);
assert_eq!(refcount3, 2, "Refcount still 2 before reaping");
let new_refcount = db.cas.get_refcount(TestPartition::KEY, different_hash);
assert_eq!(new_refcount, 1, "New record should have refcount 1");
}
#[test]
fn test_refcount_multiple_partitions() {
let db: Database<TestPartitions> = Database::new();
let source_cache = crate::source::cache::reporter::SourceCacheReader::new_empty_for_test();
let record1 = TestRecordData::Function {
params: vec![],
return_type: "void".to_string(),
};
let hash1 = crate::record::Record::content_hash(&record1);
let record2 = TestRecordData::Module {
exports: vec![Ident::new("a")],
};
let hash2 = crate::record::Record::content_hash(&record2);
let mut writer = RecordWriter::<TestPartitions>::new(
Ident::new("task1"),
);
writer.insert::<TestPartition, _>("key1".to_string(), record1);
writer.insert::<Test1Partition, _>("key2".to_string(), record2);
let chunk = writer.build();
let _ = db.commit_chunk(chunk, &source_cache);
let refcount1 = db.cas.get_refcount(TestPartition::KEY, hash1);
let refcount2 = db.cas.get_refcount(Test1Partition::KEY, hash2);
assert_eq!(refcount1, 1);
assert_eq!(refcount2, 1);
let wrong_partition = db.cas.get_refcount(Test1Partition::KEY, hash1);
assert_eq!(wrong_partition, 0);
}
#[test]
fn test_refcount_ops_hlist() {
let stores = TestPartitions::new_stores();
let hash = ContentHash::new(&[1, 2, 3, 4]);
stores.increment_refcount(TestPartition::KEY, hash);
stores.increment_refcount(TestPartition::KEY, hash);
let count = stores.get_refcount(TestPartition::KEY, hash);
assert_eq!(count, 2);
let new_count = stores.decrement_refcount(TestPartition::KEY, hash);
assert_eq!(new_count, 1);
let new_count = stores.decrement_refcount(TestPartition::KEY, hash);
assert_eq!(new_count, 0);
}
#[test]
fn test_deferred_decrement_fields() {
let dd = DeferredDecrement {
partition: Ident::new("test::partition"),
hash: ContentHash::new(&[5, 6, 7, 8]),
from_epoch: GenerationEpoch::new(42),
};
assert_eq!(dd.partition, Ident::new("test::partition"));
assert_eq!(dd.from_epoch, GenerationEpoch::new(42));
}
#[test]
fn test_refcount_incrementer_increments_references() {
let stores = TestPartitions::new_stores();
let hash1 = ContentHash::new(&[1, 1, 1, 1]);
let hash2 = ContentHash::new(&[2, 2, 2, 2]);
let handle1: RecordHandle<TestPartition> = RecordHandle::new(hash1);
let handle2: RecordHandle<Test1Partition> = RecordHandle::new(hash2);
assert_eq!(stores.get_refcount(TestPartition::KEY, hash1), 0);
assert_eq!(stores.get_refcount(Test1Partition::KEY, hash2), 0);
{
let mut incrementer = RefCountIncrementer::<TestPartitions>::new(&stores);
incrementer.add(handle1);
incrementer.add(handle2);
}
assert_eq!(stores.get_refcount(TestPartition::KEY, hash1), 1);
assert_eq!(stores.get_refcount(Test1Partition::KEY, hash2), 1);
{
let mut incrementer = RefCountIncrementer::<TestPartitions>::new(&stores);
incrementer.add(handle1);
}
assert_eq!(stores.get_refcount(TestPartition::KEY, hash1), 2);
assert_eq!(stores.get_refcount(Test1Partition::KEY, hash2), 1);
}
#[test]
fn test_refcount_decrementer_queues_decrements() {
let hash1 = ContentHash::new(&[3, 3, 3, 3]);
let hash2 = ContentHash::new(&[4, 4, 4, 4]);
let handle1: RecordHandle<TestPartition> = RecordHandle::new(hash1);
let handle2: RecordHandle<Test1Partition> = RecordHandle::new(hash2);
let mut decrements = Vec::new();
let epoch = GenerationEpoch::new(42);
{
let mut decrementer = RefCountDecrementer::new(&mut decrements, epoch);
<RefCountDecrementer as References<TestPartitions>>::add(&mut decrementer, handle1);
<RefCountDecrementer as References<TestPartitions>>::add(&mut decrementer, handle2);
}
assert_eq!(decrements.len(), 2);
assert_eq!(decrements[0].partition, TestPartition::KEY);
assert_eq!(decrements[0].hash, hash1);
assert_eq!(decrements[0].from_epoch, epoch);
assert_eq!(decrements[1].partition, Test1Partition::KEY);
assert_eq!(decrements[1].hash, hash2);
assert_eq!(decrements[1].from_epoch, epoch);
}
#[test]
fn test_decrement_refcount_underflow_protection() {
let stores = TestPartitions::new_stores();
let hash = ContentHash::new(&[99, 99, 99, 99]);
let result = stores.decrement_refcount(TestPartition::KEY, hash);
assert_eq!(result, 0, "Decrementing non-existent refcount should return 0");
let result = stores.decrement_refcount(TestPartition::KEY, hash);
assert_eq!(result, 0, "Repeated decrement of non-existent refcount should return 0");
stores.increment_refcount(TestPartition::KEY, hash);
assert_eq!(stores.get_refcount(TestPartition::KEY, hash), 1);
let result = stores.decrement_refcount(TestPartition::KEY, hash);
assert_eq!(result, 0, "Decrementing from 1 should return 0");
let result = stores.decrement_refcount(TestPartition::KEY, hash);
assert_eq!(result, 0, "Decrementing already-zero refcount should return 0");
}
#[test]
fn test_end_to_end_gc_lifecycle() {
let db: Database<TestPartitions> = Database::new();
let source_cache = crate::source::cache::reporter::SourceCacheReader::new_empty_for_test();
let record1 = TestRecordData::Function {
params: vec![Ident::new("original")],
return_type: "void".to_string(),
};
let hash1 = crate::record::Record::content_hash(&record1);
let mut writer1 = RecordWriter::<TestPartitions>::new(Ident::new("task1"));
writer1.insert::<TestPartition, _>("lifecycle::func".to_string(), record1);
let chunk1 = writer1.build();
let result1 = db.commit_chunk(chunk1, &source_cache);
assert!(result1.new_hashes.iter().any(|r| r.hash == hash1), "Record should be newly inserted");
assert_eq!(db.cas.get_refcount(TestPartition::KEY, hash1), 1, "Refcount should be 1");
let store: &PartitionStore<TestPartition> = db.cas.stores().store();
assert!(store.get(&hash1).is_some(), "Record should exist in store");
let record2 = TestRecordData::Function {
params: vec![Ident::new("replacement")],
return_type: "int".to_string(),
};
let hash2 = crate::record::Record::content_hash(&record2);
let mut writer2 = RecordWriter::<TestPartitions>::new(Ident::new("task2"));
writer2.insert::<TestPartition, _>("lifecycle::func".to_string(), record2);
let chunk2 = writer2.build();
let result2 = db.commit_chunk(chunk2, &source_cache);
assert!(result2.new_hashes.iter().any(|r| r.hash == hash2), "New record should be inserted");
assert_eq!(result2.deferred_decrements.len(), 1, "Should have one deferred decrement");
assert_eq!(result2.deferred_decrements[0].hash, hash1, "Deferred decrement should be for old hash");
assert_eq!(result2.deferred_decrements[0].partition, TestPartition::KEY, "Should be correct partition");
assert_eq!(db.cas.get_refcount(TestPartition::KEY, hash1), 1, "Old hash still has refcount");
assert_eq!(db.cas.get_refcount(TestPartition::KEY, hash2), 1, "New hash has refcount");
assert!(store.get(&hash1).is_some(), "Old record should still exist (not reaped yet)");
assert!(store.get(&hash2).is_some(), "New record should exist");
let new_refcount = db.cas.decrement_refcount(TestPartition::KEY, hash1);
assert_eq!(new_refcount, 0, "Refcount should drop to 0");
let removed = store.remove_record(hash1);
assert!(removed.is_some(), "Record should be removed");
assert!(store.get(&hash1).is_none(), "Old record should be removed from store");
assert_eq!(db.cas.get_refcount(TestPartition::KEY, hash1), 0, "Refcount should be 0");
assert!(store.get(&hash2).is_some(), "New record should still exist");
assert_eq!(db.cas.get_refcount(TestPartition::KEY, hash2), 1, "New refcount should be 1");
}
#[test]
fn test_reaper_removes_record_from_store_on_zero_refcount() {
let stores = Arc::new(TestPartitions::new_stores());
let reaper = Reaper::<TestPartitions>::new(Arc::clone(&stores));
let record = TestRecordData::Module {
exports: vec![Ident::new("test_export")],
};
let hash = crate::record::Record::content_hash(&record);
let store: &PartitionStore<TestPartition> = stores.store();
store.insert(hash, record, GenerationEpoch::new(0));
assert!(store.get(&hash).is_some(), "Record should exist after insert");
stores.increment_refcount(TestPartition::KEY, hash);
assert_eq!(stores.get_refcount(TestPartition::KEY, hash), 1);
reaper.queue_decrement(TestPartition::KEY, hash, GenerationEpoch::new(0));
let removed = reaper.reap(GenerationEpoch::new(1), usize::MAX);
assert_eq!(removed, 1, "Should report 1 record removed");
assert!(store.get(&hash).is_none(), "Record should be removed from store");
assert_eq!(stores.get_refcount(TestPartition::KEY, hash), 0, "Refcount should be 0");
}
#[test]
fn test_cascading_no_references_record() {
use crate::record::Record;
let stores = Arc::new(TestPartitions::new_stores());
let reaper = Reaper::<TestPartitions>::new(Arc::clone(&stores));
let record = TestRecordData::Module {
exports: vec![Ident::new("test")],
};
let record_hash = record.content_hash();
let store: &PartitionStore<TestPartition> = stores.store();
store.insert(record_hash, record, GenerationEpoch::new(0));
stores.increment_refcount(TestPartition::KEY, record_hash);
assert_eq!(stores.get_refcount(TestPartition::KEY, record_hash), 1);
reaper.queue_decrement(TestPartition::KEY, record_hash, GenerationEpoch::new(0));
let removed = reaper.reap(GenerationEpoch::new(1), usize::MAX);
assert_eq!(removed, 1, "Record should be removed");
assert_eq!(stores.get_refcount(TestPartition::KEY, record_hash), 0);
assert!(
store.get(&record_hash).is_none(),
"Record should be removed from store"
);
}
#[test]
fn test_cascading_deletes_with_references() {
use crate::record::Record;
let stores = Arc::new(TestPartitions::new_stores());
let reaper = Reaper::<TestPartitions>::new(Arc::clone(&stores));
let store: &PartitionStore<TestPartition> = stores.store();
let child1 = TestRecordData::Module {
exports: vec![Ident::new("child1")],
};
let child1_hash = child1.content_hash();
let child2 = TestRecordData::Module {
exports: vec![Ident::new("child2")],
};
let child2_hash = child2.content_hash();
let parent = TestRecordData::WithRefs {
children: vec![child1_hash, child2_hash],
};
let parent_hash = parent.content_hash();
store.insert(child1_hash, child1, GenerationEpoch::new(0));
store.insert(child2_hash, child2, GenerationEpoch::new(0));
store.insert(parent_hash, parent, GenerationEpoch::new(0));
stores.increment_refcount(TestPartition::KEY, parent_hash);
stores.increment_refcount(TestPartition::KEY, child1_hash);
stores.increment_refcount(TestPartition::KEY, child2_hash);
assert_eq!(stores.get_refcount(TestPartition::KEY, parent_hash), 1);
assert_eq!(stores.get_refcount(TestPartition::KEY, child1_hash), 1);
assert_eq!(stores.get_refcount(TestPartition::KEY, child2_hash), 1);
reaper.queue_decrement(TestPartition::KEY, parent_hash, GenerationEpoch::new(0));
let removed = reaper.reap(GenerationEpoch::new(1), usize::MAX);
assert_eq!(removed, 3, "Parent and both children should be removed via cascading");
assert_eq!(stores.get_refcount(TestPartition::KEY, parent_hash), 0);
assert_eq!(stores.get_refcount(TestPartition::KEY, child1_hash), 0);
assert_eq!(stores.get_refcount(TestPartition::KEY, child2_hash), 0);
assert!(store.get(&parent_hash).is_none(), "Parent should be removed from store");
assert!(store.get(&child1_hash).is_none(), "Child1 should be removed from store");
assert!(store.get(&child2_hash).is_none(), "Child2 should be removed from store");
assert!(reaper.is_empty(), "Reaper queue should be empty");
}
#[test]
fn test_reaper_concurrent_queue_operations() {
use std::thread;
let stores = Arc::new(TestPartitions::new_stores());
let reaper = Arc::new(Reaper::<TestPartitions>::new(Arc::clone(&stores)));
const NUM_THREADS: usize = 8;
const DECREMENTS_PER_THREAD: usize = 100;
let handles: Vec<_> = (0..NUM_THREADS)
.map(|thread_id| {
let reaper = Arc::clone(&reaper);
thread::spawn(move || {
for i in 0..DECREMENTS_PER_THREAD {
let hash = ContentHash::new(&[thread_id as u8, i as u8, 0, 0]);
reaper.queue_decrement(
TestPartition::KEY,
hash,
GenerationEpoch::new((thread_id * DECREMENTS_PER_THREAD + i) as u64),
);
}
})
})
.collect();
for handle in handles {
handle.join().expect("Thread should not panic");
}
assert_eq!(
reaper.queue_len(),
NUM_THREADS * DECREMENTS_PER_THREAD,
"All decrements should be queued"
);
}
#[test]
fn test_reaper_queue_decrements_batch() {
let stores = Arc::new(TestPartitions::new_stores());
let reaper = Reaper::<TestPartitions>::new(Arc::clone(&stores));
let decrements: Vec<DeferredDecrement> = (0..10)
.map(|i| DeferredDecrement {
partition: if i % 2 == 0 {
TestPartition::KEY
} else {
Test1Partition::KEY
},
hash: ContentHash::new(&[i as u8, i as u8, i as u8, i as u8]),
from_epoch: GenerationEpoch::new(i as u64),
})
.collect();
assert!(reaper.is_empty(), "Queue should start empty");
reaper.queue_decrements(decrements.clone());
assert_eq!(reaper.queue_len(), 10, "All 10 decrements should be queued");
let more_decrements: Vec<DeferredDecrement> = (10..15)
.map(|i| DeferredDecrement {
partition: TestPartition::KEY,
hash: ContentHash::new(&[i as u8, 0, 0, 0]),
from_epoch: GenerationEpoch::new(i as u64),
})
.collect();
reaper.queue_decrements(more_decrements);
assert_eq!(reaper.queue_len(), 15, "Total should be 15 decrements");
for dd in &decrements {
stores.increment_refcount(dd.partition, dd.hash);
}
let removed = reaper.reap(GenerationEpoch::new(5), usize::MAX);
assert_eq!(removed, 5, "Should remove 5 records with refcount 1");
assert_eq!(reaper.queue_len(), 10, "10 items should remain in queue");
}
#[test]
fn test_reaper_epoch_boundary_exact_match() {
let stores = Arc::new(TestPartitions::new_stores());
let reaper = Reaper::<TestPartitions>::new(Arc::clone(&stores));
let hash = ContentHash::new(&[50, 51, 52, 53]);
stores.increment_refcount(TestPartition::KEY, hash);
assert_eq!(stores.get_refcount(TestPartition::KEY, hash), 1);
reaper.queue_decrement(TestPartition::KEY, hash, GenerationEpoch::new(5));
assert_eq!(reaper.queue_len(), 1);
let removed = reaper.reap(GenerationEpoch::new(5), usize::MAX);
assert_eq!(removed, 0, "Should NOT process when from_epoch == oldest_running_epoch");
assert_eq!(reaper.queue_len(), 1, "Queue should still have 1 item");
assert_eq!(
stores.get_refcount(TestPartition::KEY, hash),
1,
"Refcount should be unchanged"
);
let removed = reaper.reap(GenerationEpoch::new(6), usize::MAX);
assert_eq!(removed, 1, "Should process when from_epoch < oldest_running_epoch");
assert!(reaper.is_empty(), "Queue should be empty");
assert_eq!(
stores.get_refcount(TestPartition::KEY, hash),
0,
"Refcount should be 0"
);
}
#[test]
fn test_concurrent_refcount_operations() {
use std::thread;
let stores = Arc::new(TestPartitions::new_stores());
let hash = ContentHash::new(&[77, 78, 79, 80]);
const NUM_THREADS: usize = 4;
const OPS_PER_THREAD: usize = 1000;
for _ in 0..NUM_THREADS * OPS_PER_THREAD {
stores.increment_refcount(TestPartition::KEY, hash);
}
let initial = stores.get_refcount(TestPartition::KEY, hash);
assert_eq!(initial, NUM_THREADS * OPS_PER_THREAD);
let handles: Vec<_> = (0..NUM_THREADS)
.map(|_| {
let stores = Arc::clone(&stores);
thread::spawn(move || {
for _ in 0..OPS_PER_THREAD {
stores.increment_refcount(TestPartition::KEY, hash);
stores.decrement_refcount(TestPartition::KEY, hash);
}
})
})
.collect();
for handle in handles {
handle.join().expect("Thread should not panic");
}
let final_count = stores.get_refcount(TestPartition::KEY, hash);
assert_eq!(
final_count, initial,
"Concurrent inc/dec should preserve count"
);
}
#[test]
fn test_reap_budget_limits_processing() {
use crate::record::Record;
let stores = Arc::new(TestPartitions::new_stores());
let reaper = Reaper::<TestPartitions>::new(Arc::clone(&stores));
let store: &PartitionStore<TestPartition> = stores.store();
let r1 = TestRecordData::Module {
exports: vec![Ident::new("r1")],
};
let r1_hash = r1.content_hash();
let r2 = TestRecordData::Module {
exports: vec![Ident::new("r2")],
};
let r2_hash = r2.content_hash();
let r3 = TestRecordData::Module {
exports: vec![Ident::new("r3")],
};
let r3_hash = r3.content_hash();
store.insert(r1_hash, r1, GenerationEpoch::new(0));
store.insert(r2_hash, r2, GenerationEpoch::new(0));
store.insert(r3_hash, r3, GenerationEpoch::new(0));
stores.increment_refcount(TestPartition::KEY, r1_hash);
stores.increment_refcount(TestPartition::KEY, r2_hash);
stores.increment_refcount(TestPartition::KEY, r3_hash);
reaper.queue_decrement(TestPartition::KEY, r1_hash, GenerationEpoch::new(0));
reaper.queue_decrement(TestPartition::KEY, r2_hash, GenerationEpoch::new(0));
reaper.queue_decrement(TestPartition::KEY, r3_hash, GenerationEpoch::new(0));
assert_eq!(reaper.queue_len(), 3);
let removed = reaper.reap(GenerationEpoch::new(1), 1);
assert_eq!(removed, 1);
assert_eq!(reaper.queue_len(), 2, "2 decrements should remain in queue");
let removed = reaper.reap(GenerationEpoch::new(1), 1);
assert_eq!(removed, 1);
assert_eq!(reaper.queue_len(), 1);
let removed = reaper.reap(GenerationEpoch::new(1), 1);
assert_eq!(removed, 1);
assert!(reaper.is_empty());
}
#[test]
fn test_reap_budget_with_cascading() {
use crate::record::Record;
let stores = Arc::new(TestPartitions::new_stores());
let reaper = Reaper::<TestPartitions>::new(Arc::clone(&stores));
let store: &PartitionStore<TestPartition> = stores.store();
let child1 = TestRecordData::Module {
exports: vec![Ident::new("cascade_child1")],
};
let child1_hash = child1.content_hash();
let child2 = TestRecordData::Module {
exports: vec![Ident::new("cascade_child2")],
};
let child2_hash = child2.content_hash();
let parent = TestRecordData::WithRefs {
children: vec![child1_hash, child2_hash],
};
let parent_hash = parent.content_hash();
store.insert(child1_hash, child1, GenerationEpoch::new(0));
store.insert(child2_hash, child2, GenerationEpoch::new(0));
store.insert(parent_hash, parent, GenerationEpoch::new(0));
stores.increment_refcount(TestPartition::KEY, parent_hash);
stores.increment_refcount(TestPartition::KEY, child1_hash);
stores.increment_refcount(TestPartition::KEY, child2_hash);
reaper.queue_decrement(TestPartition::KEY, parent_hash, GenerationEpoch::new(0));
assert_eq!(reaper.queue_len(), 1);
let removed = reaper.reap(GenerationEpoch::new(1), 1);
assert_eq!(removed, 1, "Parent should be removed");
assert_eq!(reaper.queue_len(), 2, "Cascaded child decrements should be queued");
assert!(store.get(&parent_hash).is_none());
assert!(store.get(&child1_hash).is_some());
assert!(store.get(&child2_hash).is_some());
let removed = reaper.reap(GenerationEpoch::new(1), usize::MAX);
assert_eq!(removed, 2, "Both children should be removed");
assert!(reaper.is_empty());
assert!(store.get(&child1_hash).is_none());
assert!(store.get(&child2_hash).is_none());
}