laburnum 1.17.1

An LSP framework for building language servers and compilers, powered by an incremental query tree with content-addressed storage, task-based dataflow, and parallel queries.
Documentation
// Copyright Two Neutron Stars Incorporated and contributors
// SPDX-License-Identifier: BlueOak-1.0.0

use {
  crate::{
    Partitions, TRACER,
    database::{
      chunk::RecordWriter,
      gc::GcPolicy,
    },
    protocol::lsp::LanguageServer, scheduler::task::TaskContext,
  },
  std::{
    future::Future,
    pin::Pin,
    sync::{
      Arc,
      atomic::{AtomicBool, Ordering},
    },
    time::{Duration, Instant},
  },
};

type GcTaskFn<P, T> = Box<
  dyn FnOnce(
      TaskContext<P, T>,
    ) -> Pin<Box<dyn Future<Output = Option<RecordWriter<P>>> + Send>>
    + Send
    + 'static,
>;

/// Creates a periodic garbage collection task that runs the reaper and
/// mark-sweep collector.
///
/// This task runs on IDLE_LANE and handles two GC mechanisms:
///
/// 1. **Reaper**: Processes deferred refcount decrements from index overwrites.
///    Runs every iteration, processing decrements whose epoch is older than
///    the oldest running task.
///
/// 2. **Mark-sweep**: Full tri-color mark-sweep that catches orphaned records
///    the reaper missed. Triggered by the `GcPolicy` thresholds (record count,
///    commit count, idle timeout, or periodic interval).
pub(crate) fn periodic_gc_task<P: Partitions, T: LanguageServer<P>>(
  shutdown_flag: Arc<AtomicBool>,
) -> GcTaskFn<P, T> {
  Box::new(move |ctx| {
    Box::pin({
      let scheduler = ctx.scheduler();

      async move {
        let policy = GcPolicy::default();
        let mut last_gc_time = Instant::now();
        let mut commits_since_gc: usize = 0;
        let mut last_idle_activity = Instant::now();

        loop {
          smol::Timer::after(Duration::from_millis(100)).await;

          if shutdown_flag.load(Ordering::Acquire) {
            break;
          }

          // --- Reaper pass ---
          let oldest_epoch = scheduler.oldest_running_epoch();
          if !scheduler.reaper.is_empty() {
            let removed = {
              otel::span!("laburnum.gc.reaper");
              scheduler.reaper.reap(oldest_epoch, policy.reap_budget)
            };
            if removed > 0 {
              otel::event!(
                "laburnum.gc.reaper.removed",
                "removed_count" = removed as i64
              );
              last_idle_activity = Instant::now();
            }
          }

          // --- Mark-sweep trigger check ---
          let current_epoch = scheduler.db.get_current_epoch();
          let new_commits = current_epoch.get() as usize;
          let commits_delta = new_commits.saturating_sub(commits_since_gc);

          let should_gc = {
            let periodic_due =
              last_gc_time.elapsed() >= policy.periodic_interval;

            let idle_trigger = policy
              .idle_timeout
              .map(|timeout| last_idle_activity.elapsed() >= timeout)
              .unwrap_or(false);

            let commit_trigger = policy
              .commit_threshold
              .map(|threshold| commits_delta >= threshold)
              .unwrap_or(false);

            periodic_due || idle_trigger || commit_trigger
          };

          if !should_gc || scheduler.gc.is_active() {
            continue;
          }

          // --- Mark-sweep cycle ---
          {
            otel::span!("laburnum.gc.mark_sweep");

            // Phase 1: Start marking from index roots
            let roots = scheduler.db.collect_index_hashes();
            if !scheduler.gc.start_marking(roots.into_iter()) {
              continue;
            }
          }

          // Phase 2: Mark incrementally
          loop {
            if shutdown_flag.load(Ordering::Acquire) {
              scheduler.gc.finish_sweep();
              break;
            }

            let done = P::gc_mark_tick(
              &scheduler.gc,
              scheduler.db.get_store().stores(),
              policy.mark_budget,
            );

            if done {
              break;
            }

            smol::future::yield_now().await;
          }

          if shutdown_flag.load(Ordering::Acquire) {
            break;
          }

          // Phase 3: Transition to sweeping
          if !scheduler.gc.finish_marking() {
            scheduler.gc.finish_sweep();
            continue;
          }

          // Phase 4: Sweep each partition
          P::gc_sweep(&scheduler.gc, scheduler.db.get_store().stores());

          // Phase 5: Complete the cycle
          scheduler.gc.finish_sweep();

          last_gc_time = Instant::now();
          commits_since_gc = current_epoch.get() as usize;

          otel::event!("laburnum.gc.mark_sweep.complete");
        }

        None
      }
    })
  })
}

/// Creates an on-demand garbage collection task.
///
/// This task is triggered immediately after any task completes and writes
/// a chunk to the database. It provides immediate cleanup of unreachable
/// chunks after compilation work completes.
///
/// Like periodic GC, it runs on IDLE_LANE and uses the same mark-and-sweep
/// algorithm starting from entry points.
#[allow(dead_code)]
pub(crate) fn on_demand_gc_task<P: Partitions, T: LanguageServer<P>>(
  _parent_trace_context: crate::protocol::otel::TraceContext,
) -> GcTaskFn<P, T> {
  Box::new(move |ctx| {
    Box::pin({
      let scheduler = ctx.scheduler();

      async move {
        // Run a single reaper pass
        let oldest_epoch = scheduler.oldest_running_epoch();
        if !scheduler.reaper.is_empty() {
          let removed = {
            otel::span!("laburnum.gc.on_demand");
            scheduler.reaper.reap(oldest_epoch, GcPolicy::default().reap_budget)
          };
          if removed > 0 {
            otel::event!(
              "laburnum.gc.on_demand.removed",
              "removed_count" = removed as i64
            );
          }
        }

        None
      }
    })
  })
}