laburnum 1.17.0

An LSP framework for building language servers and compilers, powered by an incremental query tree with content-addressed storage, task-based dataflow, and parallel queries.
Documentation
// Copyright Two Neutron Stars Incorporated and contributors
// SPDX-License-Identifier: BlueOak-1.0.0

//! Reference counting garbage collector reaper.
//!
//! The reaper processes deferred decrements from an epoch-ordered queue.
//! Decrements are deferred to ensure snapshot isolation - tasks that started
//! before an index update should still be able to access the old record.
//!
//! # Epoch-Based Deferral
//!
//! When an index entry is overwritten, the old hash is queued for decrement
//! with the current epoch. The reaper only processes decrements from epochs
//! older than the oldest running task, ensuring no use-after-free.
//!
//! # Cascading Deletes
//!
//! When a record's refcount drops to zero and it's removed, all records it
//! references also have their refcounts decremented. These cascaded decrements
//! are queued rather than processed inline to spread the work and avoid
//! latency spikes.
//!
//! For cascading to work, record types must implement `CollectReferences<P>`
//! for the `Partitions` type being used. The default implementation collects
//! no references (suitable for leaf records).
//!
//! # Required Trait Bounds
//!
//! For full cascading support, `P::Stores` must implement:
//! - `RefcountOps` - for increment/decrement/remove operations
//! - `CollectCascadingRefs<P>` - for collecting references from records

use {
  crate::{
    ContentHash,
    Ident,
    database::{
      GenerationEpoch,
      partitions::{CollectCascadingRefs, RefcountOps},
      storage::Partitions,
    },
  },
  parking_lot::Mutex,
  std::{
    collections::VecDeque,
    sync::Arc,
  },
};

/// A deferred decrement operation to be processed by the reaper.
#[derive(Debug, Clone)]
pub struct DeferredDecrement {
  /// The partition containing the record.
  pub partition: Ident,
  /// The content hash of the record.
  pub hash: ContentHash,
  /// The epoch when this decrement was queued.
  ///
  /// The reaper will not process this until all tasks from this epoch
  /// have completed, ensuring snapshot isolation.
  pub from_epoch: GenerationEpoch,
}

/// Reaper for processing deferred refcount decrements.
///
/// The reaper maintains a queue of deferred decrements ordered by epoch.
/// It processes decrements from epochs older than the oldest running task,
/// ensuring snapshot isolation.
///
/// # Thread Safety
///
/// The reaper is designed to be called from a single thread (typically a
/// background task), but the queue itself is protected by a mutex for
/// safe enqueueing from multiple threads.
pub struct Reaper<P: Partitions> {
  queue: Mutex<VecDeque<DeferredDecrement>>,
  stores: Arc<P::Stores>,
}

impl<P: Partitions> Reaper<P>
where
  P::Stores: RefcountOps,
{
  /// Create a new reaper for the given stores.
  pub fn new(stores: Arc<P::Stores>) -> Self {
    Self {
      queue: Mutex::new(VecDeque::new()),
      stores,
    }
  }

  /// Queue a decrement to be processed later.
  ///
  /// The decrement will not be processed until all tasks from `epoch`
  /// have completed.
  pub fn queue_decrement(&self, partition: Ident, hash: ContentHash, epoch: GenerationEpoch) {
    let mut queue = self.queue.lock();
    queue.push_back(DeferredDecrement {
      partition,
      hash,
      from_epoch: epoch,
    });
  }

  /// Queue multiple decrements at once.
  pub fn queue_decrements(&self, decrements: impl IntoIterator<Item = DeferredDecrement>) {
    let mut queue = self.queue.lock();
    queue.extend(decrements);
  }

  /// Get the current queue length.
  pub fn queue_len(&self) -> usize {
    self.queue.lock().len()
  }

  /// Check if the queue is empty.
  pub fn is_empty(&self) -> bool {
    self.queue.lock().is_empty()
  }

  /// Get a reference to the stores.
  pub fn stores(&self) -> &P::Stores {
    &self.stores
  }
}

impl<P: Partitions> Reaper<P>
where
  P::Stores: RefcountOps + CollectCascadingRefs<P>,
{
  /// Process decrements from epochs older than `oldest_running_epoch`.
  ///
  /// Processes at most `budget` decrements per call. Cascading deletes
  /// may add new items to the queue; remaining work is picked up on
  /// the next call.
  ///
  /// Returns the number of records removed (whose refcount dropped to zero).
  ///
  /// # Cascading
  ///
  /// When a record's refcount drops to zero, this method:
  /// 1. Collects all records the removed record references
  /// 2. Queues decrements for each referenced record (same epoch)
  /// 3. Removes the record
  ///
  /// Cascaded decrements use the same epoch as the original decrement to
  /// maintain snapshot isolation.
  pub fn reap(&self, oldest_running_epoch: GenerationEpoch, budget: usize) -> usize {
    let mut removed_count = 0;
    let mut processed = 0;

    loop {
      if processed >= budget {
        break;
      }

      let dd = {
        let mut queue = self.queue.lock();
        match queue.front() {
          Some(dd) if dd.from_epoch < oldest_running_epoch => {
            queue.pop_front()
          }
          _ => None,
        }
      };

      let Some(dd) = dd else {
        break;
      };

      processed += 1;

      if self.do_decrement(dd) {
        removed_count += 1;
      }
    }

    removed_count
  }

  /// Process a single decrement with cascading support.
  ///
  /// Returns true if the record was removed (refcount dropped to zero).
  ///
  /// When the refcount drops to zero:
  /// 1. Collects references from the record before removal
  /// 2. Queues decrements for all referenced records
  /// 3. Removes the record
  fn do_decrement(&self, dd: DeferredDecrement) -> bool {
    let new_count = self.stores.decrement_refcount(dd.partition, dd.hash);

    if new_count == 0 {
      // Step 1: Collect references BEFORE removing the record
      let cascaded_refs = self.stores.collect_cascading_refs(dd.partition, dd.hash);

      // Step 2: Queue decrements for all referenced records
      if !cascaded_refs.is_empty() {
        let mut queue = self.queue.lock();
        for (child_partition, child_hash) in cascaded_refs {
          queue.push_back(DeferredDecrement {
            partition: child_partition,
            hash: child_hash,
            from_epoch: dd.from_epoch,
          });
        }
      }

      // Step 3: Remove the record
      self.stores.remove_record(dd.partition, dd.hash);
      true
    } else {
      false
    }
  }
}

impl<P: Partitions> Reaper<P> {
  /// Get the current queue length (available without RefcountOps bound).
  pub fn queue_len_inner(&self) -> usize {
    self.queue.lock().len()
  }

  /// Check if the queue is empty (available without RefcountOps bound).
  pub fn is_empty_inner(&self) -> bool {
    self.queue.lock().is_empty()
  }
}

impl<P: Partitions> std::fmt::Debug for Reaper<P> {
  fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    f.debug_struct("Reaper")
      .field("queue_len", &self.queue_len_inner())
      .finish_non_exhaustive()
  }
}

/// Collector for incrementing refcounts when a record is inserted.
///
/// Implements the `References` trait to increment refcounts for all
/// records that the new record references.
pub struct RefCountIncrementer<'a, P: Partitions>
where
  P::Stores: RefcountOps,
{
  stores: &'a P::Stores,
}

impl<'a, P: Partitions> RefCountIncrementer<'a, P>
where
  P::Stores: RefcountOps,
{
  /// Create a new incrementer for the given stores.
  pub fn new(stores: &'a P::Stores) -> Self {
    Self { stores }
  }
}

impl<'a, P: Partitions> crate::record::References<P> for RefCountIncrementer<'a, P>
where
  P::Stores: RefcountOps,
{
  fn add<Part: crate::database::Partition>(&mut self, handle: crate::database::RecordHandle<Part>)
  where
    P::Stores: crate::database::HasPartition<Part>,
  {
    self
      .stores
      .increment_refcount(Part::KEY, handle.content_hash());
  }
}

/// Collector for queueing refcount decrements when a record is removed.
///
/// Implements the `References` trait to queue decrements for all
/// records that the removed record referenced.
pub struct RefCountDecrementer<'a> {
  decrements: &'a mut Vec<DeferredDecrement>,
  epoch: GenerationEpoch,
}

impl<'a> RefCountDecrementer<'a> {
  /// Create a new decrementer that queues to the given vector.
  pub fn new(decrements: &'a mut Vec<DeferredDecrement>, epoch: GenerationEpoch) -> Self {
    Self { decrements, epoch }
  }
}

impl<P: Partitions> crate::record::References<P> for RefCountDecrementer<'_> {
  fn add<Part: crate::database::Partition>(&mut self, handle: crate::database::RecordHandle<Part>)
  where
    P::Stores: crate::database::HasPartition<Part>,
  {
    self.decrements.push(DeferredDecrement {
      partition: Part::KEY,
      hash: handle.content_hash(),
      from_epoch: self.epoch,
    });
  }
}

#[cfg(test)]
mod tests {
  use super::*;

  #[test]
  fn deferred_decrement_stores_fields() {
    let dd = DeferredDecrement {
      partition: Ident::new("test"),
      hash: ContentHash::new(&[1, 2, 3]),
      from_epoch: GenerationEpoch::new(42),
    };

    assert_eq!(dd.partition, Ident::new("test"));
    assert_eq!(dd.from_epoch, GenerationEpoch::new(42));
  }
}