use {
crate::{
ContentHash,
Ident,
database::{
ContentHashRef,
GenerationEpoch,
HasPartition,
PartitionKey,
PartitionStore,
chunk::RecordWriter,
gc::GcPhase,
storage::Partitions,
},
record::Record,
},
super::common::{
TestPartition,
TestPartitions,
TestRecordData,
test_scheduler,
},
};
fn make_record(name: &str) -> TestRecordData {
TestRecordData::new(name.to_string(), 0)
}
fn source_cache() -> crate::source::cache::reporter::SourceCacheReader {
crate::source::cache::reporter::SourceCacheReader::new_empty_for_test()
}
#[test]
fn no_running_tasks_returns_current_epoch() {
let (scheduler, _conn) = test_scheduler();
let current = scheduler.db.get_current_epoch();
let oldest = scheduler.oldest_running_epoch();
assert_eq!(oldest, current);
}
#[test]
fn one_task_returns_its_epoch() {
let (scheduler, _conn) = test_scheduler();
scheduler.register_active_epoch(GenerationEpoch::new(5));
let oldest = scheduler.oldest_running_epoch();
assert_eq!(oldest, GenerationEpoch::new(5));
}
#[test]
fn multiple_tasks_returns_minimum() {
let (scheduler, _conn) = test_scheduler();
scheduler.register_active_epoch(GenerationEpoch::new(7));
scheduler.register_active_epoch(GenerationEpoch::new(3));
scheduler.register_active_epoch(GenerationEpoch::new(12));
let oldest = scheduler.oldest_running_epoch();
assert_eq!(oldest, GenerationEpoch::new(3));
}
#[test]
fn multiple_tasks_same_epoch() {
let (scheduler, _conn) = test_scheduler();
scheduler.register_active_epoch(GenerationEpoch::new(5));
scheduler.register_active_epoch(GenerationEpoch::new(5));
let epochs = scheduler.active_epochs.lock();
assert_eq!(*epochs.get(&GenerationEpoch::new(5)).unwrap_or(&0), 2);
}
#[test]
fn deregister_one_of_two_same_epoch() {
let (scheduler, _conn) = test_scheduler();
scheduler.register_active_epoch(GenerationEpoch::new(5));
scheduler.register_active_epoch(GenerationEpoch::new(5));
scheduler.deregister_active_epoch(GenerationEpoch::new(5));
let epochs = scheduler.active_epochs.lock();
assert_eq!(*epochs.get(&GenerationEpoch::new(5)).unwrap_or(&0), 1);
drop(epochs);
assert_eq!(scheduler.oldest_running_epoch(), GenerationEpoch::new(5));
}
#[test]
fn oldest_task_finishes_epoch_advances() {
let (scheduler, _conn) = test_scheduler();
scheduler.register_active_epoch(GenerationEpoch::new(3));
scheduler.register_active_epoch(GenerationEpoch::new(7));
assert_eq!(scheduler.oldest_running_epoch(), GenerationEpoch::new(3));
scheduler.deregister_active_epoch(GenerationEpoch::new(3));
assert_eq!(scheduler.oldest_running_epoch(), GenerationEpoch::new(7));
}
#[test]
fn all_tasks_finish_returns_current() {
let (scheduler, _conn) = test_scheduler();
scheduler.register_active_epoch(GenerationEpoch::new(3));
scheduler.register_active_epoch(GenerationEpoch::new(7));
scheduler.deregister_active_epoch(GenerationEpoch::new(3));
scheduler.deregister_active_epoch(GenerationEpoch::new(7));
let current = scheduler.db.get_current_epoch();
assert_eq!(scheduler.oldest_running_epoch(), current);
}
#[test]
fn deregister_nonexistent_epoch_noop() {
let (scheduler, _conn) = test_scheduler();
scheduler.deregister_active_epoch(GenerationEpoch::new(999));
let epochs = scheduler.active_epochs.lock();
assert!(epochs.is_empty());
}
#[test]
fn overwrite_produces_decrements_in_reaper() {
let (scheduler, _conn) = test_scheduler();
let sc = source_cache();
let record1 = make_record("first");
let hash1 = record1.content_hash();
let mut writer1 = RecordWriter::<TestPartitions>::new(Ident::new("t1"));
writer1.insert::<TestPartition, _>("key::a".to_string(), record1);
let _ = scheduler.db.commit_chunk(writer1.build(), &sc);
let record2 = make_record("second");
let mut writer2 = RecordWriter::<TestPartitions>::new(Ident::new("t2"));
writer2.insert::<TestPartition, _>("key::a".to_string(), record2);
let result = scheduler.db.commit_chunk(writer2.build(), &sc);
assert!(!result.deferred_decrements.is_empty());
assert!(result.deferred_decrements.iter().any(|dd| dd.hash == hash1));
}
#[test]
fn insert_no_overwrite_no_decrements() {
let (scheduler, _conn) = test_scheduler();
let sc = source_cache();
let record = make_record("fresh");
let mut writer = RecordWriter::<TestPartitions>::new(Ident::new("t1"));
writer.insert::<TestPartition, _>("key::fresh".to_string(), record);
let result = scheduler.db.commit_chunk(writer.build(), &sc);
assert!(result.deferred_decrements.is_empty());
}
#[test]
fn decrements_extracted_via_mem_take() {
let (scheduler, _conn) = test_scheduler();
let sc = source_cache();
let record1 = make_record("first");
let mut writer1 = RecordWriter::<TestPartitions>::new(Ident::new("t1"));
writer1.insert::<TestPartition, _>("key::a".to_string(), record1);
let _ = scheduler.db.commit_chunk(writer1.build(), &sc);
let record2 = make_record("second");
let mut writer2 = RecordWriter::<TestPartitions>::new(Ident::new("t2"));
writer2.insert::<TestPartition, _>("key::a".to_string(), record2);
let mut result = scheduler.db.commit_chunk(writer2.build(), &sc);
let extracted = std::mem::take(&mut result.deferred_decrements);
assert!(result.deferred_decrements.is_empty());
assert!(!extracted.is_empty());
}
#[test]
fn multiple_overwrites_accumulate() {
let (scheduler, _conn) = test_scheduler();
let sc = source_cache();
let r1 = make_record("v1a");
let r2 = make_record("v1b");
let mut writer1 = RecordWriter::<TestPartitions>::new(Ident::new("t1"));
writer1.insert::<TestPartition, _>("key::x".to_string(), r1);
writer1.insert::<TestPartition, _>("key::y".to_string(), r2);
let _ = scheduler.db.commit_chunk(writer1.build(), &sc);
let r3 = make_record("v2a");
let r4 = make_record("v2b");
let mut writer2 = RecordWriter::<TestPartitions>::new(Ident::new("t2"));
writer2.insert::<TestPartition, _>("key::x".to_string(), r3);
writer2.insert::<TestPartition, _>("key::y".to_string(), r4);
let result = scheduler.db.commit_chunk(writer2.build(), &sc);
assert_eq!(result.deferred_decrements.len(), 2);
}
#[test]
fn clear_prefix_produces_decrements() {
let (scheduler, _conn) = test_scheduler();
let sc = source_cache();
let mut writer1 = RecordWriter::<TestPartitions>::new(Ident::new("t1"));
writer1.insert::<TestPartition, _>("prefix::a".to_string(), make_record("a"));
writer1.insert::<TestPartition, _>("prefix::b".to_string(), make_record("b"));
writer1.insert::<TestPartition, _>("prefix::c".to_string(), make_record("c"));
let _ = scheduler.db.commit_chunk(writer1.build(), &sc);
let mut writer2 = RecordWriter::<TestPartitions>::new(Ident::new("t2"));
writer2.clear_prefix(TestPartition::KEY, "prefix::");
let result = scheduler.db.commit_chunk(writer2.build(), &sc);
assert_eq!(result.deferred_decrements.len(), 3);
}
#[test]
fn write_barrier_fires_during_marking() {
let (scheduler, _conn) = test_scheduler();
let sc = source_cache();
assert!(scheduler.gc.start_marking(std::iter::empty()));
assert!(scheduler.gc.is_marking());
let record = make_record("new_during_marking");
let mut writer = RecordWriter::<TestPartitions>::new(Ident::new("t1"));
writer.insert::<TestPartition, _>("key::new".to_string(), record);
let result = scheduler.db.commit_chunk(writer.build(), &sc);
scheduler.gc.add_to_gray(result.new_hashes.iter().copied());
assert!(scheduler.gc.gray_queue_len() > 0);
}
#[test]
fn write_barrier_skipped_when_idle() {
let (scheduler, _conn) = test_scheduler();
assert!(!scheduler.gc.is_marking());
assert_eq!(scheduler.gc.gray_queue_len(), 0);
let hash = ContentHash::new(&[1, 2, 3]);
scheduler.gc.add_to_gray(std::iter::once(ContentHashRef::new(TestPartition::KEY, hash)));
assert_eq!(scheduler.gc.gray_queue_len(), 0);
}
#[test]
fn write_barrier_skipped_when_sweeping() {
let (scheduler, _conn) = test_scheduler();
scheduler.gc.start_marking(std::iter::empty());
scheduler.gc.finish_marking();
assert_eq!(scheduler.gc.phase(), GcPhase::Sweeping);
let hash = ContentHash::new(&[4, 5, 6]);
scheduler.gc.add_to_gray(std::iter::once(ContentHashRef::new(TestPartition::KEY, hash)));
assert_eq!(scheduler.gc.gray_queue_len(), 0);
}
#[test]
fn reaper_processes_eligible_decrements() {
let (scheduler, _conn) = test_scheduler();
let sc = source_cache();
let record1 = make_record("v1");
let mut writer1 = RecordWriter::<TestPartitions>::new(Ident::new("t1"));
writer1.insert::<TestPartition, _>("key::a".to_string(), record1);
let _ = scheduler.db.commit_chunk(writer1.build(), &sc);
let record2 = make_record("v2");
let mut writer2 = RecordWriter::<TestPartitions>::new(Ident::new("t2"));
writer2.insert::<TestPartition, _>("key::a".to_string(), record2);
let result = scheduler.db.commit_chunk(writer2.build(), &sc);
scheduler.reaper.queue_decrements(result.deferred_decrements);
let oldest = scheduler.oldest_running_epoch();
let removed = scheduler.reaper.reap(oldest, usize::MAX);
assert!(removed > 0);
}
#[test]
fn reaper_skips_decrements_at_active_epoch() {
let (scheduler, _conn) = test_scheduler();
scheduler.register_active_epoch(GenerationEpoch::new(5));
scheduler.reaper.queue_decrement(
TestPartition::KEY,
ContentHash::new(&[10, 20, 30]),
GenerationEpoch::new(8),
);
let oldest = scheduler.oldest_running_epoch();
assert_eq!(oldest, GenerationEpoch::new(5));
let removed = scheduler.reaper.reap(oldest, usize::MAX);
assert_eq!(removed, 0);
}
#[test]
fn reaper_cascade_removes_chain() {
let (scheduler, _conn) = test_scheduler();
let sc = source_cache();
let child = make_record("child");
let _child_hash = child.content_hash();
let mut w1 = RecordWriter::<TestPartitions>::new(Ident::new("t1"));
w1.insert::<TestPartition, _>("key::child".to_string(), child);
let _ = scheduler.db.commit_chunk(w1.build(), &sc);
let parent = make_record("parent");
let _parent_hash = parent.content_hash();
let mut w2 = RecordWriter::<TestPartitions>::new(Ident::new("t2"));
w2.insert::<TestPartition, _>("key::parent".to_string(), parent);
let _ = scheduler.db.commit_chunk(w2.build(), &sc);
let mut w3 = RecordWriter::<TestPartitions>::new(Ident::new("t3"));
w3.insert::<TestPartition, _>("key::child".to_string(), make_record("child_v2"));
w3.insert::<TestPartition, _>("key::parent".to_string(), make_record("parent_v2"));
let result = scheduler.db.commit_chunk(w3.build(), &sc);
scheduler.reaper.queue_decrements(result.deferred_decrements);
let oldest = scheduler.oldest_running_epoch();
let removed = scheduler.reaper.reap(oldest, usize::MAX);
assert!(removed >= 2);
}
#[test]
fn reaper_empty_queue_returns_zero() {
let (scheduler, _conn) = test_scheduler();
let oldest = scheduler.oldest_running_epoch();
let removed = scheduler.reaper.reap(oldest, usize::MAX);
assert_eq!(removed, 0);
}
#[test]
fn full_gc_cycle_via_scheduler() {
let (scheduler, _conn) = test_scheduler();
let sc = source_cache();
let live_record = make_record("live");
let live_hash = live_record.content_hash();
let mut w1 = RecordWriter::<TestPartitions>::new(Ident::new("t1"));
w1.insert::<TestPartition, _>("key::live".to_string(), live_record);
let _ = scheduler.db.commit_chunk(w1.build(), &sc);
let garbage_record = make_record("garbage");
let garbage_hash = garbage_record.content_hash();
let mut w2 = RecordWriter::<TestPartitions>::new(Ident::new("t2"));
w2.insert::<TestPartition, _>("key::garbage_key".to_string(), garbage_record);
let _ = scheduler.db.commit_chunk(w2.build(), &sc);
let replacement = make_record("replacement");
let replacement_hash = replacement.content_hash();
let mut w3 = RecordWriter::<TestPartitions>::new(Ident::new("t3"));
w3.insert::<TestPartition, _>("key::garbage_key".to_string(), replacement);
let _ = scheduler.db.commit_chunk(w3.build(), &sc);
let roots = scheduler.db.collect_index_hashes();
assert!(scheduler.gc.start_marking(roots.into_iter()));
let stores = scheduler.db.get_store().stores();
for _ in 0..100 {
let done = TestPartitions::gc_mark_tick(&scheduler.gc, stores, 100);
if done { break; }
}
assert!(scheduler.gc.finish_marking());
TestPartitions::gc_sweep(&scheduler.gc, stores);
scheduler.gc.finish_sweep();
let roots = scheduler.db.collect_index_hashes();
assert!(scheduler.gc.start_marking(roots.into_iter()));
for _ in 0..100 {
let done = TestPartitions::gc_mark_tick(&scheduler.gc, stores, 100);
if done { break; }
}
assert!(scheduler.gc.finish_marking());
TestPartitions::gc_sweep(&scheduler.gc, stores);
scheduler.gc.finish_sweep();
let store: &PartitionStore<TestPartition> = stores.store();
assert!(store.get(&live_hash).is_some());
assert!(store.get(&replacement_hash).is_some());
assert!(store.get(&garbage_hash).is_none());
}
#[test]
fn epoch_safety_protects_new_records() {
let (scheduler, _conn) = test_scheduler();
let sc = source_cache();
let mut w0 = RecordWriter::<TestPartitions>::new(Ident::new("t0"));
w0.insert::<TestPartition, _>("key::dummy".to_string(), make_record("dummy"));
let _ = scheduler.db.commit_chunk(w0.build(), &sc);
assert!(scheduler.gc.start_marking(std::iter::empty()));
let gc_epoch = scheduler.gc.epoch();
assert_eq!(gc_epoch, GenerationEpoch::new(1));
let new_record = make_record("new_during_gc");
let new_hash = new_record.content_hash();
let mut writer = RecordWriter::<TestPartitions>::new(Ident::new("t1"));
writer.insert::<TestPartition, _>("key::new".to_string(), new_record);
let _ = scheduler.db.commit_chunk(writer.build(), &sc);
let stores = scheduler.db.get_store().stores();
for _ in 0..10 {
let done = TestPartitions::gc_mark_tick(&scheduler.gc, stores, 100);
if done {
break;
}
}
assert!(!scheduler.gc.is_black(&new_hash));
scheduler.gc.finish_marking();
TestPartitions::gc_sweep(&scheduler.gc, stores);
scheduler.gc.finish_sweep();
let store: &PartitionStore<TestPartition> = stores.store();
assert!(store.get(&new_hash).is_some());
}
#[test]
fn reaper_then_mark_sweep_cleans_all() {
let (scheduler, _conn) = test_scheduler();
let sc = source_cache();
let overwrite_victim = make_record("victim");
let victim_hash = overwrite_victim.content_hash();
let mut w1 = RecordWriter::<TestPartitions>::new(Ident::new("t1"));
w1.insert::<TestPartition, _>("key::overwrite".to_string(), overwrite_victim);
let _ = scheduler.db.commit_chunk(w1.build(), &sc);
let orphan = make_record("orphan_record");
let orphan_hash = orphan.content_hash();
let mut w_orphan = RecordWriter::<TestPartitions>::new(Ident::new("t_orphan"));
w_orphan.insert::<TestPartition, _>("orphan::x".to_string(), orphan);
let _ = scheduler.db.commit_chunk(w_orphan.build(), &sc);
let replacement = make_record("victim_v2");
let mut w2 = RecordWriter::<TestPartitions>::new(Ident::new("t2"));
w2.insert::<TestPartition, _>("key::overwrite".to_string(), replacement);
let result = scheduler.db.commit_chunk(w2.build(), &sc);
scheduler.reaper.queue_decrements(result.deferred_decrements);
let mut w3 = RecordWriter::<TestPartitions>::new(Ident::new("t3"));
w3.clear_prefix(TestPartition::KEY, "orphan::");
let clear_result = scheduler.db.commit_chunk(w3.build(), &sc);
scheduler.reaper.queue_decrements(clear_result.deferred_decrements);
let oldest = scheduler.oldest_running_epoch();
let reaped = scheduler.reaper.reap(oldest, usize::MAX);
assert!(reaped > 0);
let roots = scheduler.db.collect_index_hashes();
assert!(scheduler.gc.start_marking(roots.into_iter()));
let stores = scheduler.db.get_store().stores();
for _ in 0..100 {
let done = TestPartitions::gc_mark_tick(&scheduler.gc, stores, 100);
if done {
break;
}
}
scheduler.gc.finish_marking();
TestPartitions::gc_sweep(&scheduler.gc, stores);
scheduler.gc.finish_sweep();
let store: &PartitionStore<TestPartition> = stores.store();
assert!(store.get(&victim_hash).is_none(), "victim should be reaped");
assert!(store.get(&orphan_hash).is_none(), "orphan should be swept");
}