laburnum 1.17.1

An LSP framework for building language servers and compilers, powered by an incremental query tree with content-addressed storage, task-based dataflow, and parallel queries.
Documentation
// Copyright Two Neutron Stars Incorporated and contributors
// SPDX-License-Identifier: BlueOak-1.0.0

use {
  super::common::*,
  crate::scheduler::lanes::*,
  macro_rules_attribute::apply,
  std::{
    future::Future,
    pin::Pin,
    sync::{
      Arc,
      atomic::{
        AtomicBool,
        AtomicUsize,
        Ordering,
      },
    },
    task::{
      Context,
      Poll,
    },
    time::Duration,
  },
};

/// Verifies that a simple task enqueued on the scheduler executes and completes
/// successfully.
#[apply(smol_macros::test!)]
#[test_log::test]
async fn test_task_runs_to_completion() {
  let (scheduler, _conn) = test_scheduler();
  scheduler.spawn_workers();

  let completed = Arc::new(AtomicBool::new(false));
  let completed_clone = completed.clone();

  scheduler.queue(
    |_control| {
      async move {
        completed_clone.store(true, Ordering::SeqCst);
        None
      }
    },
    DEFAULT_LANE,
  );

  assert!(
    wait_for(|| completed.load(Ordering::SeqCst), Duration::from_secs(5)).await,
    "Task did not complete within timeout"
  );
}

/// Verifies that multiple tasks enqueued on the same lane all execute to
/// completion even when they yield many times, ensuring no tasks are lost or
/// starved.
#[apply(smol_macros::test!)]
#[test_log::test]
async fn test_multiple_tasks_complete() {
  let (scheduler, _conn) = test_scheduler();

  let counter = Arc::new(AtomicUsize::new(0));
  let num_tasks = 10;

  for _ in 0..num_tasks {
    let counter_clone = counter.clone();
    scheduler.queue(
      |_control| {
        async move {
          for _ in 0..1000 {
            futures_lite::future::yield_now().await;
          }
          counter_clone.fetch_add(1, Ordering::SeqCst);
          None
        }
      },
      DEFAULT_LANE,
    );
  }

  scheduler.spawn_workers();

  assert!(
    wait_for(
      || counter.load(Ordering::SeqCst) == num_tasks,
      Duration::from_secs(5)
    )
    .await,
    "Not all tasks completed: {} / {}",
    counter.load(Ordering::SeqCst),
    num_tasks
  );
}

struct YieldingFuture {
  yields_remaining: usize,
  completed:        Arc<AtomicBool>,
}

impl Future for YieldingFuture {
  type Output = ();

  fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
    if self.yields_remaining > 0 {
      self.yields_remaining -= 1;
      cx.waker().wake_by_ref();
      Poll::Pending
    } else {
      self.completed.store(true, Ordering::SeqCst);
      Poll::Ready(())
    }
  }
}

/// Verifies that tasks returning Poll::Pending are correctly re-enqueued and
/// resumed until completion, testing the core yield-and-resume mechanism.
#[apply(smol_macros::test!)]
#[test_log::test]
async fn test_task_yields_and_resumes() {
  let (scheduler, _conn) = test_scheduler();
  scheduler.spawn_workers();

  let completed = Arc::new(AtomicBool::new(false));

  let future = YieldingFuture {
    yields_remaining: 5,
    completed:        completed.clone(),
  };

  scheduler.queue(
    |_control| {
      async move {
        let mut yielding = future;
        (&mut yielding).await;
        None
      }
    },
    DEFAULT_LANE,
  );

  assert!(
    wait_for(|| completed.load(Ordering::SeqCst), Duration::from_secs(5)).await,
    "Yielding task did not complete within timeout"
  );
}