laburnum 1.17.1

An LSP framework for building language servers and compilers, powered by an incremental query tree with content-addressed storage, task-based dataflow, and parallel queries.
Documentation
// Copyright Two Neutron Stars Incorporated and contributors
// SPDX-License-Identifier: BlueOak-1.0.0

use {
  crate::{
    ContentHash,
    database::{
      ContentHashRef,
      GenerationEpoch,
      HasPartition,
      Partition,
      Partitions,
      handle::RecordHandle,
    },
    record::{
      CollectReferences,
      Record,
      References,
    },
  },
  dashmap::DashSet,
  parking_lot::Mutex,
  std::{
    collections::VecDeque,
    marker::PhantomData,
    sync::atomic::{
      AtomicU64,
      AtomicU8,
      Ordering,
    },
    time::Duration,
  },
};

const GC_PHASE_IDLE: u8 = 0;
const GC_PHASE_MARKING: u8 = 1;
const GC_PHASE_SWEEPING: u8 = 2;

/// Concurrent tri-color mark-sweep garbage collector for content-addressed records.
///
/// The GC runs incrementally, interleaved with normal operations:
/// 1. **Idle** - No GC in progress
/// 2. **Marking** - Tracing reachable records from roots (index values)
/// 3. **Sweeping** - Removing unreachable records from stores
///
/// # Tri-Color Abstraction
///
/// - **White**: Not yet visited (potentially garbage)
/// - **Gray**: Visited but children not yet processed (in `gray` queue)
/// - **Black**: Fully processed, definitely reachable (in `black` set)
///
/// # Partition-Aware Marking
///
/// Gray queue items are tagged with their partition key so each
/// `mark_tick_partition` call only processes items belonging to that
/// partition. This prevents one partition's pass from stealing items
/// that belong to another partition.
///
/// # Write Barrier
///
/// During marking, new writes must be tracked to avoid collecting live records.
/// When a chunk is committed during marking, its new hashes are added
/// to the gray queue (tagged with partition) if they haven't been marked black.
///
/// # Incremental Collection
///
/// The `mark_tick_partition()` method processes a budget of records per call,
/// allowing the GC to yield to normal operations. This prevents long pauses.
pub struct GarbageCollector {
  phase:    AtomicU8,
  gc_epoch: AtomicU64,
  gray:     Mutex<VecDeque<ContentHashRef>>,
  black:    DashSet<ContentHash>,
}

/// Configuration for garbage collection triggering and behavior.
#[derive(Debug, Clone)]
pub struct GcPolicy {
  /// Trigger GC when record count exceeds this threshold.
  /// Default: 100,000 records.
  pub record_threshold: Option<usize>,

  /// Trigger GC after this many commits.
  /// Default: 1,000 commits.
  pub commit_threshold: Option<usize>,

  /// Trigger GC after this duration of idle time.
  /// Default: 5 seconds.
  pub idle_timeout: Option<Duration>,

  /// Run GC periodically at this interval regardless of other triggers.
  /// Default: 60 seconds.
  pub periodic_interval: Duration,

  /// Budget for mark_tick - how many records to process per tick.
  /// Default: 1000.
  pub mark_budget: usize,

  /// Maximum number of decrements the reaper will process per call.
  /// Bounds latency from cascading deletes (e.g. a long chain of records
  /// each referencing the next). Remaining work stays in the queue and
  /// is picked up on the next reaper pass.
  /// Default: 10,000.
  pub reap_budget: usize,
}

impl Default for GcPolicy {
  fn default() -> Self {
    Self {
      record_threshold:  Some(100_000),
      commit_threshold:  Some(1_000),
      idle_timeout:      Some(Duration::from_secs(5)),
      periodic_interval: Duration::from_secs(60),
      mark_budget:       1000,
      reap_budget:       10_000,
    }
  }
}

impl GarbageCollector {
  pub fn new() -> Self {
    Self {
      phase:    AtomicU8::new(GC_PHASE_IDLE),
      gc_epoch: AtomicU64::new(0),
      gray:     Mutex::new(VecDeque::new()),
      black:    DashSet::new(),
    }
  }

  /// Start the marking phase with the given root hashes.
  ///
  /// Roots are typically collected from all partition indexes using
  /// `Partitions::collect_index_hashes()` — these are the "live" records
  /// that should not be collected. Each root is tagged with its partition
  /// key so the GC routes it to the correct partition store.
  ///
  /// Returns `false` if GC is already in progress.
  pub fn start_marking(
    &self,
    roots: impl Iterator<Item = ContentHashRef>,
  ) -> bool {
    if self.phase.compare_exchange(
      GC_PHASE_IDLE,
      GC_PHASE_MARKING,
      Ordering::SeqCst,
      Ordering::SeqCst,
    ).is_err() {
      return false;
    }

    self.gc_epoch.fetch_add(1, Ordering::SeqCst);
    self.black.clear();

    let mut gray = self.gray.lock();
    gray.clear();
    gray.extend(roots);

    true
  }

  /// Process a batch of gray records for a single partition, moving them to black.
  ///
  /// Only processes gray items tagged with `Part::KEY`. Items for other
  /// partitions are left in the queue for their own `mark_tick_partition` call.
  ///
  /// Uses `CollectReferences<P>` to discover child records with their
  /// partition type information, ensuring cross-partition references are
  /// correctly tagged in the gray queue.
  ///
  /// Returns `true` when the entire gray queue is empty (marking complete).
  pub fn mark_tick_partition<P, Part>(
    &self,
    stores: &P::Stores,
    budget: usize,
  ) -> bool
  where
    P: Partitions,
    Part: Partition + 'static,
    Part::Record: Record + CollectReferences<P>,
    P::Stores: HasPartition<Part>,
  {
    if self.phase.load(Ordering::SeqCst) != GC_PHASE_MARKING {
      return true;
    }

    let part_store = <P::Stores as HasPartition<Part>>::store(stores);
    let mut processed = 0;
    let mut new_gray = Vec::new();

    while processed < budget {
      let item = {
        let mut gray = self.gray.lock();
        let pos = gray.iter().position(|r| r.partition == Part::KEY);
        pos.and_then(|i| gray.remove(i))
      };

      let Some(item) = item else {
        break;
      };

      if self.black.contains(&item.hash) {
        continue;
      }

      self.black.insert(item.hash);
      processed += 1;

      if let Some(record_ref) = part_store.get(&item.hash)
        && let Some(record) = record_ref.record()
      {
        let mut collector = GcRefCollector::<P>::new();
        record.collect_references(&mut collector);
        for child_ref in collector.refs {
          if !self.black.contains(&child_ref.hash) {
            new_gray.push(child_ref);
          }
        }
      }
    }

    if !new_gray.is_empty() {
      let mut gray = self.gray.lock();
      gray.extend(new_gray);
    }

    self.gray.lock().is_empty()
  }

  /// Check if marking is complete and transition to idle or sweeping.
  pub fn finish_marking(&self) -> bool {
    let gray = self.gray.lock();
    if gray.is_empty() {
      self.phase.store(GC_PHASE_SWEEPING, Ordering::SeqCst);
      true
    } else {
      false
    }
  }

  /// Sweep a partition, removing records not in the black set.
  ///
  /// Call this for each partition after marking completes.
  pub fn sweep_partition<P, Part>(&self, stores: &P::Stores)
  where
    P: Partitions,
    Part: Partition + 'static,
    P::Stores: HasPartition<Part>,
  {
    if self.phase.load(Ordering::SeqCst) != GC_PHASE_SWEEPING {
      return;
    }

    let gc_epoch = GenerationEpoch::new(self.gc_epoch.load(Ordering::SeqCst));
    let black = &self.black;
    let part_store = <P::Stores as HasPartition<Part>>::store(stores);

    part_store.retain(|hash, record_epoch| {
      if record_epoch >= gc_epoch {
        true
      } else {
        black.contains(&hash)
      }
    });
  }

  /// Complete the GC cycle, transitioning back to idle.
  pub fn finish_sweep(&self) {
    self.phase.store(GC_PHASE_IDLE, Ordering::SeqCst);
    self.black.clear();
  }

  /// Check if GC is currently in the marking phase.
  ///
  /// Used by the commit protocol to decide whether to apply write barriers.
  pub fn is_marking(&self) -> bool {
    self.phase.load(Ordering::SeqCst) == GC_PHASE_MARKING
  }

  /// Check if GC is in any active phase (not idle).
  pub fn is_active(&self) -> bool {
    self.phase.load(Ordering::SeqCst) != GC_PHASE_IDLE
  }

  /// Check if a hash has been marked black (reachable).
  ///
  /// Used by write barriers to avoid re-adding already-processed hashes.
  pub fn is_black(&self, hash: &ContentHash) -> bool {
    self.black.contains(hash)
  }

  /// Add partition-tagged hashes to the gray queue (write barrier).
  ///
  /// Call this during commit when GC is marking. New record hashes
  /// that aren't yet black are added to ensure they're traced.
  pub fn add_to_gray(&self, refs: impl Iterator<Item = ContentHashRef>) {
    if self.phase.load(Ordering::SeqCst) != GC_PHASE_MARKING {
      return;
    }

    let mut gray = self.gray.lock();
    for r in refs {
      if !self.black.contains(&r.hash) {
        gray.push_back(r);
      }
    }
  }

  /// Get the current GC epoch.
  ///
  /// The epoch increments each time a GC cycle starts. Records created
  /// during or after the current epoch are not swept.
  pub fn epoch(&self) -> GenerationEpoch {
    GenerationEpoch::new(self.gc_epoch.load(Ordering::SeqCst))
  }

  /// Get the current GC phase.
  pub fn phase(&self) -> GcPhase {
    match self.phase.load(Ordering::SeqCst) {
      | GC_PHASE_IDLE => GcPhase::Idle,
      | GC_PHASE_MARKING => GcPhase::Marking,
      | GC_PHASE_SWEEPING => GcPhase::Sweeping,
      | _ => GcPhase::Idle,
    }
  }

  /// Get the size of the gray queue.
  pub fn gray_queue_len(&self) -> usize {
    self.gray.lock().len()
  }

  /// Get the number of black (marked) records.
  pub fn black_set_len(&self) -> usize {
    self.black.len()
  }
}

impl Default for GarbageCollector {
  fn default() -> Self {
    Self::new()
  }
}

/// The current phase of garbage collection.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum GcPhase {
  Idle,
  Marking,
  Sweeping,
}

/// Collects references from a record during GC marking.
///
/// Implements `References<P>` to collect partition-tagged content hash
/// refs from a record's `CollectReferences` implementation.
struct GcRefCollector<P: Partitions> {
  refs:    Vec<ContentHashRef>,
  _marker: PhantomData<P>,
}

impl<P: Partitions> GcRefCollector<P> {
  fn new() -> Self {
    Self {
      refs:    Vec::new(),
      _marker: PhantomData,
    }
  }
}

impl<P: Partitions> References<P> for GcRefCollector<P> {
  fn add<Part: Partition>(
    &mut self,
    handle: RecordHandle<Part>,
  ) where
    P::Stores: HasPartition<Part>,
  {
    self.refs.push(ContentHashRef::new(Part::KEY, handle.content_hash()));
  }
}