laburnum 1.17.0

An LSP framework for building language servers and compilers, powered by an incremental query tree with content-addressed storage, task-based dataflow, and parallel queries.
Documentation
// Copyright Two Neutron Stars Incorporated and contributors
// SPDX-License-Identifier: BlueOak-1.0.0

use {
  crate::{
    Partitions,
    TRACER,
    protocol::lsp::LanguageServer,
    scheduler::{
      Scheduler,
      lanes::{
        Lane,
        RPC_LANE_HIGH_IDX,
        RPC_LANE_LOW_IDX,
        is_rpc_lane,
        lane_priority,
      },
      task::LaburnumTask,
    },
  },
  std::{
    sync::{
      Arc,
      atomic::Ordering,
    },
    task::Poll,
    thread,
    time::Instant,
  },
};

pub(crate) struct Worker<P: Partitions, T: LanguageServer<P>> {
  id:              usize,
  scheduler:       Arc<Scheduler<P, T>>,
  local_same_lane: Vec<Arc<LaburnumTask<P, T>>>,
  current_lane:    Option<Lane>,
  poll_count:      usize,
  last_rpc_check:  Instant,
  trace_context:   crate::protocol::otel::TraceContext,
}

impl<P: Partitions, T: LanguageServer<P>> Worker<P, T> {
  const RPC_POLL_INTERVAL: std::time::Duration =
    std::time::Duration::from_millis(1);
  const SYNC_CHECK_PRIORITY_INTERVAL: usize = 16;

  pub(crate) fn new(
    id: usize,
    scheduler: Arc<Scheduler<P, T>>,
    trace_context: crate::protocol::otel::TraceContext,
  ) -> Self {
    Self {
      id,
      scheduler,
      local_same_lane: Vec::new(),
      current_lane: None,
      poll_count: 0,
      last_rpc_check: Instant::now() - Self::RPC_POLL_INTERVAL,
      trace_context,
    }
  }

  pub(crate) fn spawn(
    id: usize,
    scheduler: Arc<Scheduler<P, T>>,
    trace_context: crate::protocol::otel::TraceContext,
  ) -> thread::JoinHandle<()> {
    thread::spawn(move || {
      let mut worker = Worker::new(id, scheduler, trace_context);
      worker.run();
    })
  }

  fn run(&mut self) {
    let _context_guard = self.trace_context.attach();

    otel::span!("laburnum.worker.run", "worker.id" = self.id as i64);

    loop {
      if self.scheduler.shutdown_flag.load(Ordering::Acquire) {
        return;
      }

      // 1. Finish local same-lane work first (fairness)
      if let Some(task) = self.local_same_lane.pop() {
        self.poll_task(task);
        continue;
      }

      // 2. Steal highest priority work from shared queues
      if let Some(task) = self.steal_work() {
        self.current_lane = Some(task.lane);
        self.poll_task(task);
        continue;
      }

      // 3. No work available - park or spin
      self.park();
    }
  }

  fn poll_task(&mut self, task: Arc<LaburnumTask<P, T>>) {
    if task.lane != super::lanes::SYNC_LANE {
      self.poll_count += 1;
      if self
        .poll_count
        .is_multiple_of(Self::SYNC_CHECK_PRIORITY_INTERVAL)
        && let Some(higher) = self.has_higher_priority_work(task.lane)
      {
        if is_rpc_lane(task.lane) {
          self.scheduler.queue_rpc_task(task);
        } else {
          self.scheduler.queue_task(task);
        }
        self.current_lane = Some(higher.lane);
        self.poll_task(higher);
        return;
      }
    }

    match task.poll_once() {
      | Poll::Ready(()) => {},
      | Poll::Pending => {},
    }

    if is_rpc_lane(task.lane) {
      self.last_rpc_check = Instant::now();
    }
  }

  fn steal_work(&mut self) -> Option<Arc<LaburnumTask<P, T>>> {
    for lane_idx in 0..RPC_LANE_HIGH_IDX {
      if let Ok(task) = self.scheduler.lane_queues[lane_idx].pop() {
        return Some(task);
      }
    }

    if self.last_rpc_check.elapsed() >= Self::RPC_POLL_INTERVAL {
      self.last_rpc_check = Instant::now();
      if let Some(task) = self.steal_rpc_work() {
        return Some(task);
      }
    }

    for lane_idx in (RPC_LANE_LOW_IDX + 1)..=30 {
      if let Ok(task) = self.scheduler.lane_queues[lane_idx].pop() {
        return Some(task);
      }
    }

    None
  }

  fn steal_rpc_work(&self) -> Option<Arc<LaburnumTask<P, T>>> {
    for idx in RPC_LANE_HIGH_IDX..=RPC_LANE_LOW_IDX {
      if let Ok(task) = self.scheduler.lane_queues[idx].pop() {
        return Some(task);
      }
    }
    None
  }

  fn has_higher_priority_work(
    &self,
    current_lane: Lane,
  ) -> Option<Arc<LaburnumTask<P, T>>> {
    let current_priority = lane_priority(current_lane);
    for lane_idx in 0..current_priority as usize {
      if let Ok(task) = self.scheduler.lane_queues[lane_idx].pop() {
        return Some(task);
      }
    }
    None
  }

  /// Pops the next task from the queue for `lane`. Reserved for unfinished
  /// same-lane work-stealing; pairs with `steal_rpc_work` /
  /// `has_higher_priority_work`.
  #[allow(dead_code)]
  fn steal_same_lane_work(
    &self,
    lane: Lane,
  ) -> Option<Arc<LaburnumTask<P, T>>> {
    let lane_idx = lane_priority(lane) as usize;
    self.scheduler.lane_queues[lane_idx].pop().ok()
  }
}

impl<P: Partitions, T: LanguageServer<P>> Worker<P, T> {
  fn park(&mut self) {
    if self.scheduler.shutdown_flag.load(Ordering::Acquire) {
      return;
    }

    thread::park_timeout(std::time::Duration::from_millis(100));
  }
}