laburnum 1.17.1

An LSP framework for building language servers and compilers, powered by an incremental query tree with content-addressed storage, task-based dataflow, and parallel queries.
Documentation
// Copyright Two Neutron Stars Incorporated and contributors
// SPDX-License-Identifier: BlueOak-1.0.0

use {
  super::common::*,
  crate::scheduler::lanes::*,
  macro_rules_attribute::apply,
  std::{
    sync::{
      Arc,
      atomic::{
        AtomicUsize,
        Ordering,
      },
    },
    time::Duration,
  },
};

/// Verifies that multiple workers can execute tasks concurrently, ensuring
/// that the worker pool correctly distributes work across threads.
#[apply(smol_macros::test!)]
#[test_log::test]
async fn test_multiple_workers_execute_concurrently() {
  let (scheduler, _conn) = test_scheduler();

  let num_tasks = 10;
  let started = Arc::new(AtomicUsize::new(0));
  let completed = Arc::new(AtomicUsize::new(0));

  for _ in 0..num_tasks {
    let started_clone = started.clone();
    let completed_clone = completed.clone();
    scheduler.queue(
      |_control| {
        async move {
          started_clone.fetch_add(1, Ordering::SeqCst);
          for _ in 0..1000 {
            futures_lite::future::yield_now().await;
          }
          completed_clone.fetch_add(1, Ordering::SeqCst);
          None
        }
      },
      DEFAULT_LANE,
    );
  }

  scheduler.spawn_workers();

  assert!(
    wait_for(
      || started.load(Ordering::SeqCst) == num_tasks,
      Duration::from_secs(5)
    )
    .await,
    "Not all tasks started"
  );

  assert!(
    wait_for(
      || completed.load(Ordering::SeqCst) == num_tasks,
      Duration::from_secs(10)
    )
    .await,
    "Not all tasks completed"
  );
}

/// Verifies that workers correctly handle tasks across multiple priority lanes,
/// ensuring all lanes are serviced and tasks from different lanes complete.
#[apply(smol_macros::test!)]
#[test_log::test]
async fn test_workers_handle_multiple_lanes() {
  let (scheduler, _conn) = test_scheduler();

  let sync_completed = Arc::new(AtomicUsize::new(0));
  let default_completed = Arc::new(AtomicUsize::new(0));
  let async_completed = Arc::new(AtomicUsize::new(0));

  for _ in 0..5 {
    let sync_clone = sync_completed.clone();
    scheduler.queue(
      |_control| {
        async move {
          for _ in 0..1000 {
            futures_lite::future::yield_now().await;
          }
          sync_clone.fetch_add(1, Ordering::SeqCst);
          None
        }
      },
      SYNC_LANE,
    );

    let default_clone = default_completed.clone();
    scheduler.queue(
      |_control| {
        async move {
          for _ in 0..1000 {
            futures_lite::future::yield_now().await;
          }
          default_clone.fetch_add(1, Ordering::SeqCst);
          None
        }
      },
      DEFAULT_LANE,
    );

    let async_clone = async_completed.clone();
    scheduler.queue(
      |_control| {
        async move {
          for _ in 0..1000 {
            futures_lite::future::yield_now().await;
          }
          async_clone.fetch_add(1, Ordering::SeqCst);
          None
        }
      },
      ASYNC_LANE1,
    );
  }

  scheduler.spawn_workers();

  assert!(
    wait_for(
      || {
        sync_completed.load(Ordering::SeqCst) == 5
          && default_completed.load(Ordering::SeqCst) == 5
          && async_completed.load(Ordering::SeqCst) == 5
      },
      Duration::from_secs(5)
    )
    .await,
    "Not all tasks completed: sync={}, default={}, async={}",
    sync_completed.load(Ordering::SeqCst),
    default_completed.load(Ordering::SeqCst),
    async_completed.load(Ordering::SeqCst)
  );
}