use {
super::common::{
TestPartition, TestPartitions, TestRecordData, TestSortKey, test_scheduler,
},
crate::{
Ident,
database::{PartitionKey, RecordKey, chunk::RecordWriter, query::SortKeyCondition},
scheduler::watcher_owner_id,
source::cache::reporter::SourceCacheReader,
},
std::sync::{
Arc,
atomic::{AtomicUsize, Ordering},
},
};
fn commit(
scheduler: &Arc<
crate::scheduler::Scheduler<
super::common::TestPartitions,
crate::server::LaburnumLanguageServer,
>,
>,
sort_key: &str,
) {
let mut writer = RecordWriter::new(Ident::new("writer"));
writer.insert::<TestPartition>(
sort_key.to_string(),
TestRecordData::new(format!("v-{sort_key}"), 0),
);
let result = scheduler
.db
.commit_chunk(writer.build(), &SourceCacheReader::new_empty_for_test());
scheduler.on_new_chunk(Ident::new("writer"), result);
}
#[test]
fn redispatch_fires_on_matching_insert() {
let (scheduler, _conn) = test_scheduler();
let count = Arc::new(AtomicUsize::new(0));
let c = count.clone();
scheduler.register_pending_redispatch(
crate::Ident::new("test-owner"),
TestPartition::KEY,
SortKeyCondition::Exact(TestSortKey::Test("sk1".to_string())),
Box::new(move || {
c.fetch_add(1, Ordering::SeqCst);
}),
);
commit(&scheduler, "sk1");
assert_eq!(
count.load(Ordering::SeqCst),
1,
"a matching insert fires the re-dispatch"
);
}
#[test]
fn redispatch_survives_non_matching_insert() {
let (scheduler, _conn) = test_scheduler();
let count = Arc::new(AtomicUsize::new(0));
let c = count.clone();
scheduler.register_pending_redispatch(
crate::Ident::new("test-owner"),
TestPartition::KEY,
SortKeyCondition::Exact(TestSortKey::Test("sk1".to_string())),
Box::new(move || {
c.fetch_add(1, Ordering::SeqCst);
}),
);
commit(&scheduler, "sk2");
assert_eq!(
count.load(Ordering::SeqCst),
0,
"a non-matching insert must not fire the re-dispatch"
);
commit(&scheduler, "sk1");
assert_eq!(
count.load(Ordering::SeqCst),
1,
"the surviving registration fires when a matching key later lands"
);
}
#[test]
fn redispatch_is_one_shot() {
let (scheduler, _conn) = test_scheduler();
let count = Arc::new(AtomicUsize::new(0));
let c = count.clone();
scheduler.register_pending_redispatch(
crate::Ident::new("test-owner"),
TestPartition::KEY,
SortKeyCondition::BeginsWith(TestSortKey::Test("sk".to_string())),
Box::new(move || {
c.fetch_add(1, Ordering::SeqCst);
}),
);
commit(&scheduler, "sk1");
commit(&scheduler, "sk2");
assert_eq!(
count.load(Ordering::SeqCst),
1,
"a fired re-dispatch is removed; a second matching commit does not re-fire it"
);
}
#[test]
fn clear_redispatch_for_owner_removes_only_that_owner() {
let (scheduler, _conn) = test_scheduler();
let count_a = Arc::new(AtomicUsize::new(0));
let count_b = Arc::new(AtomicUsize::new(0));
let ca = count_a.clone();
let cb = count_b.clone();
let owner_a = crate::Ident::new("owner-a");
let owner_b = crate::Ident::new("owner-b");
scheduler.register_pending_redispatch(
owner_a,
TestPartition::KEY,
SortKeyCondition::BeginsWith(TestSortKey::Test("sk".to_string())),
Box::new(move || {
ca.fetch_add(1, Ordering::SeqCst);
}),
);
scheduler.register_pending_redispatch(
owner_b,
TestPartition::KEY,
SortKeyCondition::BeginsWith(TestSortKey::Test("sk".to_string())),
Box::new(move || {
cb.fetch_add(1, Ordering::SeqCst);
}),
);
scheduler.clear_redispatch_for_owner(owner_a);
commit(&scheduler, "sk1");
assert_eq!(
count_a.load(Ordering::SeqCst),
0,
"owner A's registration was cleared and must not fire"
);
assert_eq!(
count_b.load(Ordering::SeqCst),
1,
"owner B's registration survives the clear and fires"
);
}
#[test]
fn per_key_instances_do_not_clobber_each_others_subscriptions() {
let (scheduler, _conn) = test_scheduler();
let task_pk =
crate::SpannedIdent::for_test(Ident::new("watcher:test::per_key"));
let keys: Vec<RecordKey<TestPartitions>> = (1..=4)
.map(|i| {
RecordKey::new(TestPartition::KEY, TestSortKey::Test(format!("sk{i}")))
})
.collect();
let owners: Vec<Ident> = keys
.iter()
.map(|k| watcher_owner_id::<TestPartitions>(task_pk, std::slice::from_ref(k), &[]))
.collect();
assert_ne!(owners[0], owners[1], "per-key instances must get distinct owners");
assert_ne!(owners[1], owners[2], "per-key instances must get distinct owners");
assert_ne!(owners[2], owners[3], "per-key instances must get distinct owners");
let counts: Vec<Arc<AtomicUsize>> =
(0..4).map(|_| Arc::new(AtomicUsize::new(0))).collect();
for i in 0..4 {
let c = counts[i].clone();
scheduler.register_pending_redispatch(
owners[i],
TestPartition::KEY,
SortKeyCondition::Exact(TestSortKey::Test(format!("sk{}", i + 1))),
Box::new(move || {
c.fetch_add(1, Ordering::SeqCst);
}),
);
}
scheduler.clear_redispatch_for_owner(owners[1]);
for i in 1..=4 {
commit(&scheduler, &format!("sk{i}"));
}
assert_eq!(counts[0].load(Ordering::SeqCst), 1, "instance 1 survives");
assert_eq!(
counts[1].load(Ordering::SeqCst),
0,
"instance 2 was cleared and must not fire"
);
assert_eq!(counts[2].load(Ordering::SeqCst), 1, "instance 3 survives");
assert_eq!(counts[3].load(Ordering::SeqCst), 1, "instance 4 survives");
}