laburnum 1.17.3

An LSP framework for building language servers and compilers, powered by an incremental query tree with content-addressed storage, task-based dataflow, and parallel queries.
// Copyright Two Neutron Stars Incorporated and contributors
// SPDX-License-Identifier: BlueOak-1.0.0

//! `pending_deps` re-dispatch (GLD-0035): a watcher that records a dependency
//! it read while absent is re-run when a matching key later lands. These tests
//! exercise the scheduler registry directly: `register_pending_redispatch`
//! stores a one-shot thunk keyed by `(partition, condition)`, and
//! `on_new_chunk` fires the thunks whose condition matches a committed key.

use {
  super::common::{
    TestPartition, TestPartitions, TestRecordData, TestSortKey, test_scheduler,
  },
  crate::{
    Ident,
    database::{PartitionKey, RecordKey, chunk::RecordWriter, query::SortKeyCondition},
    scheduler::watcher_owner_id,
    source::cache::reporter::SourceCacheReader,
  },
  std::sync::{
    Arc,
    atomic::{AtomicUsize, Ordering},
  },
};

/// Commit a single record at `sort_key` to `TestPartition` and drive the
/// scheduler's post-commit dispatch (`on_new_chunk`), exactly as `poll_once`
/// does after a task commits.
fn commit(
  scheduler: &Arc<
    crate::scheduler::Scheduler<
      super::common::TestPartitions,
      crate::server::LaburnumLanguageServer,
    >,
  >,
  sort_key: &str,
) {
  // Distinct content per key: `inserted_keys` only reports keys whose content
  // is new to the content-addressed store (the early-cutoff), so two keys with
  // identical content would dedupe and the second would not be reported.
  let mut writer = RecordWriter::new(Ident::new("writer"));
  writer.insert::<TestPartition>(
    sort_key.to_string(),
    TestRecordData::new(format!("v-{sort_key}"), 0),
  );
  let result = scheduler
    .db
    .commit_chunk(writer.build(), &SourceCacheReader::new_empty_for_test());
  scheduler.on_new_chunk(Ident::new("writer"), result);
}

#[test]
fn redispatch_fires_on_matching_insert() {
  let (scheduler, _conn) = test_scheduler();

  let count = Arc::new(AtomicUsize::new(0));
  let c = count.clone();
  scheduler.register_pending_redispatch(
    crate::Ident::new("test-owner"),
    TestPartition::KEY,
    SortKeyCondition::Exact(TestSortKey::Test("sk1".to_string())),
    Box::new(move || {
      c.fetch_add(1, Ordering::SeqCst);
    }),
  );

  commit(&scheduler, "sk1");

  assert_eq!(
    count.load(Ordering::SeqCst),
    1,
    "a matching insert fires the re-dispatch"
  );
}

#[test]
fn redispatch_survives_non_matching_insert() {
  let (scheduler, _conn) = test_scheduler();

  let count = Arc::new(AtomicUsize::new(0));
  let c = count.clone();
  scheduler.register_pending_redispatch(
    crate::Ident::new("test-owner"),
    TestPartition::KEY,
    SortKeyCondition::Exact(TestSortKey::Test("sk1".to_string())),
    Box::new(move || {
      c.fetch_add(1, Ordering::SeqCst);
    }),
  );

  commit(&scheduler, "sk2");
  assert_eq!(
    count.load(Ordering::SeqCst),
    0,
    "a non-matching insert must not fire the re-dispatch"
  );

  commit(&scheduler, "sk1");
  assert_eq!(
    count.load(Ordering::SeqCst),
    1,
    "the surviving registration fires when a matching key later lands"
  );
}

#[test]
fn redispatch_is_one_shot() {
  let (scheduler, _conn) = test_scheduler();

  let count = Arc::new(AtomicUsize::new(0));
  let c = count.clone();
  scheduler.register_pending_redispatch(
    crate::Ident::new("test-owner"),
    TestPartition::KEY,
    SortKeyCondition::BeginsWith(TestSortKey::Test("sk".to_string())),
    Box::new(move || {
      c.fetch_add(1, Ordering::SeqCst);
    }),
  );

  commit(&scheduler, "sk1");
  commit(&scheduler, "sk2");

  assert_eq!(
    count.load(Ordering::SeqCst),
    1,
    "a fired re-dispatch is removed; a second matching commit does not re-fire it"
  );
}

#[test]
fn clear_redispatch_for_owner_removes_only_that_owner() {
  let (scheduler, _conn) = test_scheduler();

  let count_a = Arc::new(AtomicUsize::new(0));
  let count_b = Arc::new(AtomicUsize::new(0));
  let ca = count_a.clone();
  let cb = count_b.clone();
  let owner_a = crate::Ident::new("owner-a");
  let owner_b = crate::Ident::new("owner-b");

  scheduler.register_pending_redispatch(
    owner_a,
    TestPartition::KEY,
    SortKeyCondition::BeginsWith(TestSortKey::Test("sk".to_string())),
    Box::new(move || {
      ca.fetch_add(1, Ordering::SeqCst);
    }),
  );
  scheduler.register_pending_redispatch(
    owner_b,
    TestPartition::KEY,
    SortKeyCondition::BeginsWith(TestSortKey::Test("sk".to_string())),
    Box::new(move || {
      cb.fetch_add(1, Ordering::SeqCst);
    }),
  );

  // Clearing owner A leaves owner B's identical registration intact.
  scheduler.clear_redispatch_for_owner(owner_a);

  commit(&scheduler, "sk1");

  assert_eq!(
    count_a.load(Ordering::SeqCst),
    0,
    "owner A's registration was cleared and must not fire"
  );
  assert_eq!(
    count_b.load(Ordering::SeqCst),
    1,
    "owner B's registration survives the clear and fires"
  );
}

/// Four `per_key` instances of one watcher, each subscribed to a distinct sort
/// key in the same partition under a distinct owner id (`hash(task_pk, keys)`).
/// Re-running one instance clears ONLY its own owner; the siblings'
/// subscriptions survive. This is what keeps per-key instances from clobbering
/// each other (the owner id folds the matched keys, not just the handler).
#[test]
fn per_key_instances_do_not_clobber_each_others_subscriptions() {
  let (scheduler, _conn) = test_scheduler();

  // Mirror per_key dispatch: one instance per matched key, each owner DERIVED
  // BY THE PRODUCTION fn `watcher_owner_id` (which folds task_pk + matched
  // keys). Deriving them — rather than hand-building distinct ids — is what
  // makes this catch a regression that folded only task_pk: that would give
  // every instance the same owner, so clearing one would wipe the others and
  // fail the asserts below.
  let task_pk =
    crate::SpannedIdent::for_test(Ident::new("watcher:test::per_key"));
  let keys: Vec<RecordKey<TestPartitions>> = (1..=4)
    .map(|i| {
      RecordKey::new(TestPartition::KEY, TestSortKey::Test(format!("sk{i}")))
    })
    .collect();
  let owners: Vec<Ident> = keys
    .iter()
    .map(|k| watcher_owner_id::<TestPartitions>(task_pk, std::slice::from_ref(k), &[]))
    .collect();

  // The crux of per-key isolation: distinct matched keys → distinct owners.
  assert_ne!(owners[0], owners[1], "per-key instances must get distinct owners");
  assert_ne!(owners[1], owners[2], "per-key instances must get distinct owners");
  assert_ne!(owners[2], owners[3], "per-key instances must get distinct owners");

  let counts: Vec<Arc<AtomicUsize>> =
    (0..4).map(|_| Arc::new(AtomicUsize::new(0))).collect();

  for i in 0..4 {
    let c = counts[i].clone();
    scheduler.register_pending_redispatch(
      owners[i],
      TestPartition::KEY,
      SortKeyCondition::Exact(TestSortKey::Test(format!("sk{}", i + 1))),
      Box::new(move || {
        c.fetch_add(1, Ordering::SeqCst);
      }),
    );
  }

  // Instance 2 re-runs: it clears its own owner before re-registering.
  scheduler.clear_redispatch_for_owner(owners[1]);

  // Commit each instance's key. Instances 1, 3, 4 still fire; 2 was cleared.
  for i in 1..=4 {
    commit(&scheduler, &format!("sk{i}"));
  }

  assert_eq!(counts[0].load(Ordering::SeqCst), 1, "instance 1 survives");
  assert_eq!(
    counts[1].load(Ordering::SeqCst),
    0,
    "instance 2 was cleared and must not fire"
  );
  assert_eq!(counts[2].load(Ordering::SeqCst), 1, "instance 3 survives");
  assert_eq!(counts[3].load(Ordering::SeqCst), 1, "instance 4 survives");
}