laburnum 1.17.1

An LSP framework for building language servers and compilers, powered by an incremental query tree with content-addressed storage, task-based dataflow, and parallel queries.
Documentation
// Copyright Two Neutron Stars Incorporated and contributors
// SPDX-License-Identifier: BlueOak-1.0.0

use {
  super::common::*,
  crate::scheduler::lanes::*,
  macro_rules_attribute::apply,
  std::{
    sync::{
      Arc,
      atomic::{
        AtomicUsize,
        Ordering,
      },
    },
    time::Duration,
  },
};

/// Verifies that SYNC_LANE tasks preempt lower priority tasks even when
/// enqueued after them, and that high-priority tasks maintain their priority
/// even when yielding. Uses single worker to ensure deterministic execution
/// order.
#[apply(smol_macros::test!)]
#[test_log::test]
async fn test_sync_lane_highest_priority() {
  let (scheduler, _conn) = test_scheduler();

  let execution_order = Arc::new(parking_lot::Mutex::new(Vec::new()));
  let yield_count = Arc::new(AtomicUsize::new(0));

  for i in 0..100 {
    let order = execution_order.clone();
    let yields = yield_count.clone();
    scheduler.queue(
      move |_ctx| {
        async move {
          for _ in 0..5 {
            yields.fetch_add(1, Ordering::SeqCst);
            futures_lite::future::yield_now().await;
          }
          order.lock().push(format!("default_{}", i));
          None
        }
      },
      DEFAULT_LANE,
    );
  }

  scheduler.spawn_workers();

  assert!(
    wait_for(
      || yield_count.load(Ordering::SeqCst) > 50,
      Duration::from_secs(5)
    )
    .await,
    "Default tasks should start yielding"
  );

  let order = execution_order.clone();
  let sync_yields = yield_count.clone();
  scheduler.queue(
    |_control| {
      async move {
        for _ in 0..10 {
          sync_yields.fetch_add(1, Ordering::SeqCst);
          futures_lite::future::yield_now().await;
        }
        order.lock().push("sync_task".to_string());
        None
      }
    },
    SYNC_LANE,
  );

  assert!(
    wait_for(
      || execution_order.lock().contains(&"sync_task".to_string()),
      Duration::from_secs(5)
    )
    .await,
    "Sync task did not execute"
  );

  assert!(
    wait_for(
      || execution_order.lock().len() == 101,
      Duration::from_secs(5)
    )
    .await,
    "All tasks should complete"
  );

  let total_yields = yield_count.load(Ordering::SeqCst);
  let expected_yields = (100 * 5) + 10;
  assert_eq!(
    total_yields, expected_yields,
    "Expected {} yields (100 tasks * 5 + sync task * 10), got {}",
    expected_yields, total_yields
  );

  let order = execution_order.lock();
  let sync_pos = order
    .iter()
    .position(|s| s == "sync_task")
    .expect("sync task should be in order");

  assert!(
    sync_pos < 100,
    "Sync task should execute before all default tasks finish, but was at position {}",
    sync_pos
  );
}

/// Verifies that tasks are executed in strict priority order across different
/// lanes: SYNC_LANE (highest) -> DEFAULT_LANE -> SPECULATIVE_LANE (lowest).
/// Uses single worker to ensure deterministic execution order.
#[apply(smol_macros::test!)]
#[test_log::test]
async fn test_priority_ordering() {
  let (scheduler, _conn) = test_scheduler();

  let execution_order = Arc::new(parking_lot::Mutex::new(Vec::new()));

  let order = execution_order.clone();
  scheduler.queue(
    |_control| {
      async move {
        order.lock().push("speculative");
        None
      }
    },
    SPECULATIVE_LANE,
  );

  let order = execution_order.clone();
  scheduler.queue(
    |_control| {
      async move {
        order.lock().push("default");
        None
      }
    },
    DEFAULT_LANE,
  );

  let order = execution_order.clone();
  scheduler.queue(
    |_control| {
      async move {
        order.lock().push("sync");
        None
      }
    },
    SYNC_LANE,
  );

  scheduler.spawn_workers();

  assert!(
    wait_for(|| execution_order.lock().len() == 3, Duration::from_secs(5))
      .await,
    "Not all tasks completed"
  );

  let order = execution_order.lock();
  let sync_pos = order.iter().position(|s| *s == "sync").unwrap();
  let default_pos = order.iter().position(|s| *s == "default").unwrap();
  let spec_pos = order.iter().position(|s| *s == "speculative").unwrap();

  assert!(
    sync_pos < default_pos,
    "Sync should execute before default: sync={}, default={}",
    sync_pos,
    default_pos
  );
  assert!(
    default_pos < spec_pos,
    "Default should execute before speculative: default={}, spec={}",
    default_pos,
    spec_pos
  );
}

/// Verifies that multiple tasks on the same lane are all executed to completion
/// without starvation, ensuring fair scheduling within a priority level.
/// Uses single worker to ensure deterministic behavior.
#[apply(smol_macros::test!)]
#[test_log::test]
async fn test_same_lane_fairness() {
  let (scheduler, _conn) = test_scheduler();

  let completed = Arc::new(AtomicUsize::new(0));
  let num_tasks = 10;

  for _ in 0..num_tasks {
    let completed_clone = completed.clone();
    scheduler.queue(
      |_control| {
        async move {
          for _ in 0..1000 {
            futures_lite::future::yield_now().await;
          }
          completed_clone.fetch_add(1, Ordering::SeqCst);
          None
        }
      },
      DEFAULT_LANE,
    );
  }

  scheduler.spawn_workers();

  assert!(
    wait_for(
      || completed.load(Ordering::SeqCst) == num_tasks,
      Duration::from_secs(5)
    )
    .await,
    "Not all same-lane tasks completed (starvation): {} / {}",
    completed.load(Ordering::SeqCst),
    num_tasks
  );
}