use {
crate::{
Ident,
database::{
PartitionKey,
chunk::RecordWriter,
partitions::SortKeyOf,
query::SortKeyCondition,
tests::storage::{
Test1Partition, Test2Partition, TestPartitions, TestRecordData,
},
},
scheduler::{Scheduler, lanes::DEFAULT_LANE},
source::cache::reporter::SourceCacheReader,
},
macro_rules_attribute::apply,
parking_lot::Mutex,
std::{
sync::{
Arc,
atomic::{AtomicUsize, Ordering},
},
time::Duration,
},
};
type P1 = Test1Partition;
type P2 = Test2Partition;
const PREFIX: &str = "dep:";
fn module_record(tag: &str) -> TestRecordData {
TestRecordData::Module {
exports: vec![Ident::new(tag)],
}
}
fn new_scheduler()
-> Arc<Scheduler<TestPartitions, crate::server::LaburnumLanguageServer>> {
let (server_conn, _client_conn) = crate::connect::ipc::Connection::memory();
let filesystems = Arc::new(parking_lot::RwLock::new(Vec::new()));
let source_cache =
Arc::new(parking_lot::RwLock::new(crate::SourceCache::new()));
let config = crate::scheduler::SchedulerConfiguration {
rpc_response_capacity: 100,
enable_periodic_gc: false,
idle_debounce: Duration::from_millis(10),
};
let scheduler = Scheduler::new_with_config(
server_conn,
Arc::new(crate::server::LaburnumLanguageServer),
filesystems,
source_cache,
1,
config,
);
scheduler.spawn_workers();
scheduler
}
async fn wait_for<F: Fn() -> bool>(condition: F, timeout: Duration) -> bool {
let start = std::time::Instant::now();
while start.elapsed() < timeout {
if condition() {
return true;
}
futures_lite::future::yield_now().await;
}
false
}
fn commit_p1(
scheduler: &Arc<
Scheduler<TestPartitions, crate::server::LaburnumLanguageServer>,
>,
writer_id: &str,
sort_key: &str,
record: TestRecordData,
) {
let mut writer = RecordWriter::new(Ident::new(writer_id));
writer.insert::<P1>(sort_key.to_string(), record);
let result = scheduler
.db
.commit_chunk(writer.build(), &SourceCacheReader::new_empty_for_test());
scheduler.on_new_chunk(Ident::new(writer_id), result);
}
fn commit_p1_keys(
scheduler: &Arc<
Scheduler<TestPartitions, crate::server::LaburnumLanguageServer>,
>,
writer_id: &str,
sort_key: &str,
record: TestRecordData,
) -> Vec<crate::database::RecordKey<TestPartitions>> {
let mut writer = RecordWriter::new(Ident::new(writer_id));
writer.insert::<P1>(sort_key.to_string(), record);
let result = scheduler
.db
.commit_chunk(writer.build(), &SourceCacheReader::new_empty_for_test());
let keys: Vec<_> = result.all_inserted_keys().cloned().collect();
scheduler.on_new_chunk(Ident::new(writer_id), result);
keys
}
fn commit_p2(
scheduler: &Arc<
Scheduler<TestPartitions, crate::server::LaburnumLanguageServer>,
>,
writer_id: &str,
sort_key: &str,
record: TestRecordData,
) {
let mut writer = RecordWriter::new(Ident::new(writer_id));
writer.insert::<P2>(sort_key.to_string(), record);
let result = scheduler
.db
.commit_chunk(writer.build(), &SourceCacheReader::new_empty_for_test());
scheduler.on_new_chunk(Ident::new(writer_id), result);
}
fn spawn_watcher_run(
scheduler: Arc<
Scheduler<TestPartitions, crate::server::LaburnumLanguageServer>,
>,
run_count: Arc<AtomicUsize>,
last_seen_rows: Arc<AtomicUsize>,
rearm_when_empty: bool,
) {
let sched_for_task = scheduler.clone();
scheduler.queue(
move |mut ctx| {
let sched = sched_for_task.clone();
let run_count = run_count.clone();
let last_seen_rows = last_seen_rows.clone();
async move {
run_count.fetch_add(1, Ordering::SeqCst);
let results = ctx
.query_client()
.query(Test2Partition)
.sort_key_begins_with(PREFIX.to_string())
.execute()
.await;
let row_count = results.len();
last_seen_rows.store(row_count, Ordering::SeqCst);
let should_rearm = if rearm_when_empty {
row_count == 0
} else {
true
};
if should_rearm {
let condition = SortKeyCondition::BeginsWith(<P2 as SortKeyOf<
TestPartitions,
>>::wrap_sort_key(
PREFIX.to_string()
));
let sched_inner = sched.clone();
let run_count = run_count.clone();
let last_seen_rows = last_seen_rows.clone();
sched.register_pending_redispatch(
crate::Ident::new("test-owner"),
P2::KEY,
condition,
Box::new(move || {
spawn_watcher_run(
sched_inner.clone(),
run_count.clone(),
last_seen_rows.clone(),
rearm_when_empty,
);
}),
);
}
None
}
},
DEFAULT_LANE,
);
}
struct Observed {
runs_after_p1: usize,
rows_after_p1: usize,
runs_after_p2: usize,
rows_after_p2: usize,
}
async fn run_variant(
rearm_when_empty: bool,
p2_prepopulated: bool,
) -> Observed {
let scheduler = new_scheduler();
let run_count = Arc::new(AtomicUsize::new(0));
let last_seen_rows = Arc::new(AtomicUsize::new(usize::MAX));
if p2_prepopulated {
commit_p2(
&scheduler,
"p2-seed-source",
&format!("{PREFIX}seed"),
module_record("dep-seed"),
);
}
commit_p1(
&scheduler,
"p1-source",
"module-a",
module_record("module-a"),
);
spawn_watcher_run(
scheduler.clone(),
run_count.clone(),
last_seen_rows.clone(),
rearm_when_empty,
);
assert!(
wait_for(
|| run_count.load(Ordering::SeqCst) >= 1,
Duration::from_secs(5)
)
.await,
"watcher did not run after P1 trigger"
);
let _ = wait_for(|| false, Duration::from_millis(50)).await;
let runs_after_p1 = run_count.load(Ordering::SeqCst);
let rows_after_p1 = last_seen_rows.load(Ordering::SeqCst);
commit_p2(
&scheduler,
"p2-other-source",
&format!("{PREFIX}first"),
module_record("dep-first"),
);
let baseline = runs_after_p1;
let _ = wait_for(
|| run_count.load(Ordering::SeqCst) > baseline,
Duration::from_secs(2),
)
.await;
let _ = wait_for(|| false, Duration::from_millis(100)).await;
let runs_after_p2 = run_count.load(Ordering::SeqCst);
let rows_after_p2 = last_seen_rows.load(Ordering::SeqCst);
Observed {
runs_after_p1,
rows_after_p1,
runs_after_p2,
rows_after_p2,
}
}
fn report(label: &str, o: &Observed) {
eprintln!(
"{label}: runs_after_p1={} rows_seen_after_p1={} runs_after_p2_write={} \
rows_seen_on_rerun={} | re-ran on cross-source P2 write? {}",
o.runs_after_p1,
o.rows_after_p1,
o.runs_after_p2,
o.rows_after_p2,
o.runs_after_p2 > o.runs_after_p1,
);
}
#[apply(smol_macros::test!)]
async fn observe_reactive_prefix_redispatch_both_variants() {
let a_empty = run_variant(true, false).await;
let b_empty = run_variant(false, false).await;
eprintln!("--- Scenario 1: P2 empty on first run (dependency absent) ---");
report("VARIANT A (re-arm only when P2 empty)", &a_empty);
report("VARIANT B (re-arm unconditionally) ", &b_empty);
let a_seed = run_variant(true, true).await;
let b_seed = run_variant(false, true).await;
eprintln!(
"--- Scenario 2: P2 pre-populated on first run (dependency present) ---"
);
report("VARIANT A (re-arm only when P2 empty)", &a_seed);
report("VARIANT B (re-arm unconditionally) ", &b_seed);
assert_eq!(a_empty.rows_after_p1, 0, "P2 empty on first run");
assert_eq!(b_empty.rows_after_p1, 0, "P2 empty on first run");
assert!(
a_empty.runs_after_p2 > a_empty.runs_after_p1,
"Variant A re-runs when the first query was empty (absent-dep branch armed)"
);
assert!(
b_empty.runs_after_p2 > b_empty.runs_after_p1,
"Variant B re-runs (always armed)"
);
assert_eq!(
a_seed.rows_after_p1, 1,
"Variant A first query succeeds (sees the seeded row)"
);
assert_eq!(
a_seed.runs_after_p2, a_seed.runs_after_p1,
"Variant A does NOT re-run on the cross-source P2 write (never armed)"
);
assert!(
b_seed.runs_after_p2 > b_seed.runs_after_p1,
"Variant B re-runs on the cross-source P2 write (armed regardless)"
);
assert!(
b_seed.rows_after_p2 >= 2,
"Variant B's re-run sees the new P2 row plus the seed"
);
}
#[derive(Default)]
struct MatchedKeyLog {
updated_per_run: Vec<Vec<crate::database::RecordKey<TestPartitions>>>,
deleted_per_run: Vec<Vec<crate::database::RecordKey<TestPartitions>>>,
}
fn spawn_watcher_run_with_keys(
scheduler: Arc<
Scheduler<TestPartitions, crate::server::LaburnumLanguageServer>,
>,
updated: Vec<crate::database::RecordKey<TestPartitions>>,
deleted: Vec<crate::database::RecordKey<TestPartitions>>,
run_count: Arc<AtomicUsize>,
log: Arc<Mutex<MatchedKeyLog>>,
rearmed: Arc<AtomicUsize>,
) {
let sched_for_task = scheduler.clone();
scheduler.queue(
move |mut ctx| {
let sched = sched_for_task.clone();
let run_count = run_count.clone();
let log = log.clone();
let rearmed = rearmed.clone();
let updated = updated.clone();
let deleted = deleted.clone();
async move {
ctx.set_matched_keys(updated.clone(), deleted.clone());
run_count.fetch_add(1, Ordering::SeqCst);
let seen_updated = ctx.matched_keys_updated();
let seen_deleted = ctx.matched_keys_deleted();
{
let mut log = log.lock();
log.updated_per_run.push(seen_updated);
log.deleted_per_run.push(seen_deleted);
}
if rearmed.fetch_add(1, Ordering::SeqCst) == 0 {
let condition = SortKeyCondition::BeginsWith(<P2 as SortKeyOf<
TestPartitions,
>>::wrap_sort_key(
PREFIX.to_string()
));
let sched_inner = sched.clone();
let run_count = run_count.clone();
let log = log.clone();
let rearmed = rearmed.clone();
let updated = updated.clone();
let deleted = deleted.clone();
sched.register_pending_redispatch(
crate::Ident::new("test-owner"),
P2::KEY,
condition,
Box::new(move || {
spawn_watcher_run_with_keys(
sched_inner.clone(),
updated.clone(),
deleted.clone(),
run_count.clone(),
log.clone(),
rearmed.clone(),
);
}),
);
}
None
}
},
DEFAULT_LANE,
);
}
#[apply(smol_macros::test!)]
async fn replayed_args_are_original_p1_keys() {
let scheduler = new_scheduler();
let run_count = Arc::new(AtomicUsize::new(0));
let rearmed = Arc::new(AtomicUsize::new(0));
let log = Arc::new(Mutex::new(MatchedKeyLog::default()));
let p1_keys = commit_p1_keys(
&scheduler,
"p1-source",
"module-a",
module_record("module-a"),
);
assert_eq!(p1_keys.len(), 1, "exactly one P1 key inserted");
let original_p1_key = p1_keys[0].clone();
assert_eq!(original_p1_key.partition_key(), P1::KEY.ident());
spawn_watcher_run_with_keys(
scheduler.clone(),
p1_keys.clone(),
Vec::new(),
run_count.clone(),
log.clone(),
rearmed.clone(),
);
assert!(
wait_for(
|| run_count.load(Ordering::SeqCst) >= 1,
Duration::from_secs(5)
)
.await,
"watcher did not run after P1 trigger"
);
let _ = wait_for(|| false, Duration::from_millis(50)).await;
assert_eq!(
run_count.load(Ordering::SeqCst),
1,
"exactly one run after the P1 trigger, before any P2 write"
);
let p2_trigger_sort = format!("{PREFIX}first");
commit_p2(
&scheduler,
"p2-other-source",
&p2_trigger_sort,
module_record("dep-first"),
);
let p2_trigger_key = crate::database::RecordKey::<TestPartitions>::new(
P2::KEY,
<P2 as SortKeyOf<TestPartitions>>::wrap_sort_key(p2_trigger_sort.clone()),
);
assert!(
wait_for(
|| run_count.load(Ordering::SeqCst) >= 2,
Duration::from_secs(2),
)
.await,
"watcher did not re-run after the cross-source P2 write"
);
let _ = wait_for(|| false, Duration::from_millis(100)).await;
assert_eq!(
run_count.load(Ordering::SeqCst),
2,
"exactly two runs: the P1 trigger and the single P2-triggered re-run"
);
let log = log.lock();
assert_eq!(log.updated_per_run.len(), 2, "two recorded runs");
assert_eq!(
log.updated_per_run[0],
vec![original_p1_key.clone()],
"first run's matched_keys_updated is the original P1 key"
);
assert!(
log.deleted_per_run[0].is_empty(),
"first run had no deleted matched keys"
);
assert_eq!(
log.updated_per_run[1], log.updated_per_run[0],
"the P2-triggered re-run replays the SAME matched keys as the first run"
);
assert_eq!(
log.updated_per_run[1],
vec![original_p1_key.clone()],
"the re-run still carries the original P1 key"
);
assert!(
!log.updated_per_run[1].contains(&p2_trigger_key),
"the P2 trigger key must not leak into the replayed matched keys"
);
assert!(
!log.deleted_per_run[1].contains(&p2_trigger_key),
"the P2 trigger key must not leak into the replayed deleted keys"
);
eprintln!(
"run[0] matched_updated key_ident={} | run[1] matched_updated key_ident={} \
| p2_trigger key_ident={}",
log.updated_per_run[0][0].key_ident(),
log.updated_per_run[1][0].key_ident(),
p2_trigger_key.key_ident(),
);
}