use std::collections::VecDeque;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant};
use crossbeam_channel::{bounded, Receiver, Sender};
use dashmap::DashMap;
use parking_lot::RwLock;
use super::error::{PersistentARTrieError, Result};
use super::wal::{WalRecord, WalWriter};
#[derive(Debug, Clone)]
pub struct GcConfig {
pub gc_interval: Duration,
pub min_retained_versions: usize,
pub max_retained_versions: usize,
pub grace_period: Duration,
pub max_gc_per_cycle: usize,
pub background_gc: bool,
}
impl Default for GcConfig {
fn default() -> Self {
Self {
gc_interval: Duration::from_secs(1),
min_retained_versions: 5,
max_retained_versions: 100,
grace_period: Duration::from_secs(5),
max_gc_per_cycle: 10,
background_gc: true,
}
}
}
impl GcConfig {
pub fn for_testing() -> Self {
Self {
gc_interval: Duration::from_millis(100),
min_retained_versions: 2,
max_retained_versions: 10,
grace_period: Duration::from_millis(100),
max_gc_per_cycle: 5,
background_gc: false,
}
}
pub fn high_throughput() -> Self {
Self {
gc_interval: Duration::from_millis(500),
min_retained_versions: 10,
max_retained_versions: 1000,
grace_period: Duration::from_secs(2),
max_gc_per_cycle: 50,
background_gc: true,
}
}
}
#[derive(Debug, Clone)]
pub struct GcCandidate {
pub version_id: u64,
pub root_ptr: u64,
pub eligible_since: Instant,
pub node_count: u64,
}
#[derive(Debug, Clone, Default)]
pub struct GcStats {
pub cycles_run: u64,
pub versions_collected: u64,
pub versions_skipped: u64,
pub bytes_reclaimed: u64,
pub last_cycle_duration: Duration,
pub pending_candidates: usize,
pub versions_with_readers: usize,
}
#[derive(Debug)]
enum GcMessage {
RunCycle,
Shutdown,
AddCandidate(GcCandidate),
}
#[derive(Debug)]
pub struct VersionGcRegistry {
active_readers: DashMap<u64, u64>,
gc_candidates: RwLock<VecDeque<GcCandidate>>,
config: GcConfig,
modifications_since_gc: AtomicU64,
terminating: AtomicBool,
stats: RwLock<GcStats>,
worker_tx: Option<Sender<GcMessage>>,
worker_handle: RwLock<Option<JoinHandle<()>>>,
}
impl VersionGcRegistry {
pub fn new(config: GcConfig) -> Arc<Self> {
let (worker_tx, worker_rx) = if config.background_gc {
let (tx, rx) = bounded::<GcMessage>(1000);
(Some(tx), Some(rx))
} else {
(None, None)
};
let registry = Arc::new(Self {
active_readers: DashMap::new(),
gc_candidates: RwLock::new(VecDeque::new()),
config: config.clone(),
modifications_since_gc: AtomicU64::new(0),
terminating: AtomicBool::new(false),
stats: RwLock::new(GcStats::default()),
worker_tx,
worker_handle: RwLock::new(None),
});
if let Some(rx) = worker_rx {
let registry_clone = Arc::clone(®istry);
let interval = config.gc_interval;
let handle = thread::Builder::new()
.name("version-gc".to_string())
.spawn(move || {
Self::gc_worker_loop(registry_clone, rx, interval);
})
.expect("Failed to spawn GC worker thread");
*registry.worker_handle.write() = Some(handle);
}
registry
}
fn gc_worker_loop(registry: Arc<Self>, rx: Receiver<GcMessage>, interval: Duration) {
let mut last_cycle = Instant::now();
loop {
match rx.recv_timeout(interval) {
Ok(GcMessage::Shutdown) => {
break;
}
Ok(GcMessage::RunCycle) => {
registry.run_gc_cycle_internal();
last_cycle = Instant::now();
}
Ok(GcMessage::AddCandidate(candidate)) => {
let mut candidates = registry.gc_candidates.write();
candidates.push_back(candidate);
}
Err(crossbeam_channel::RecvTimeoutError::Timeout) => {
if last_cycle.elapsed() >= interval {
registry.run_gc_cycle_internal();
last_cycle = Instant::now();
}
}
Err(crossbeam_channel::RecvTimeoutError::Disconnected) => {
break;
}
}
if registry.terminating.load(Ordering::Acquire) {
break;
}
}
}
pub fn add_reader(&self, version_id: u64) {
self.active_readers
.entry(version_id)
.and_modify(|count| *count += 1)
.or_insert(1);
}
pub fn remove_reader(&self, version_id: u64) {
if let Some(mut entry) = self.active_readers.get_mut(&version_id) {
*entry = entry.saturating_sub(1);
if *entry == 0 {
drop(entry);
self.active_readers.remove(&version_id);
}
}
}
pub fn reader_count(&self, version_id: u64) -> u64 {
self.active_readers
.get(&version_id)
.map(|v| *v)
.unwrap_or(0)
}
pub fn has_readers(&self, version_id: u64) -> bool {
self.reader_count(version_id) > 0
}
pub fn add_gc_candidate(&self, version_id: u64, root_ptr: u64, node_count: u64) {
let candidate = GcCandidate {
version_id,
root_ptr,
eligible_since: Instant::now(),
node_count,
};
if let Some(ref tx) = self.worker_tx {
let _ = tx.try_send(GcMessage::AddCandidate(candidate));
} else {
let mut candidates = self.gc_candidates.write();
candidates.push_back(candidate);
}
}
pub fn record_modification(&self) {
self.modifications_since_gc.fetch_add(1, Ordering::Release);
}
pub fn trigger_gc(&self) {
if let Some(ref tx) = self.worker_tx {
let _ = tx.try_send(GcMessage::RunCycle);
} else {
self.run_gc_cycle_internal();
}
}
pub fn run_gc_cycle(&self, wal: &mut WalWriter) -> Result<Vec<u64>> {
let collected = self.run_gc_cycle_internal();
if !collected.is_empty() {
let record = WalRecord::VersionGc {
version_ids: collected.clone(),
};
wal.append(record).map_err(|e| {
PersistentARTrieError::internal(format!("Failed to write VersionGc: {}", e))
})?;
}
Ok(collected)
}
fn run_gc_cycle_internal(&self) -> Vec<u64> {
let start = Instant::now();
let mods = self.modifications_since_gc.load(Ordering::Acquire);
if mods == 0 {
return Vec::new();
}
let mut collected = Vec::new();
let now = Instant::now();
let grace_period = self.config.grace_period;
let max_gc = self.config.max_gc_per_cycle;
let min_retain = self.config.min_retained_versions;
{
let mut candidates = self.gc_candidates.write();
let total_candidates = candidates.len();
if total_candidates <= min_retain {
return Vec::new(); }
let max_gc_for_retention = total_candidates - min_retain;
let mut remaining = VecDeque::new();
let mut gc_count = 0;
while let Some(candidate) = candidates.pop_front() {
if gc_count >= max_gc {
remaining.push_back(candidate);
continue;
}
if gc_count >= max_gc_for_retention {
remaining.push_back(candidate);
continue;
}
if now.duration_since(candidate.eligible_since) < grace_period {
remaining.push_back(candidate);
continue;
}
if self.has_readers(candidate.version_id) {
remaining.push_back(candidate);
let mut stats = self.stats.write();
stats.versions_skipped += 1;
continue;
}
collected.push(candidate.version_id);
gc_count += 1;
{
let mut stats = self.stats.write();
stats.versions_collected += 1;
stats.bytes_reclaimed += candidate.node_count * 200;
}
}
*candidates = remaining;
}
self.modifications_since_gc.store(0, Ordering::Release);
{
let mut stats = self.stats.write();
stats.cycles_run += 1;
stats.last_cycle_duration = start.elapsed();
stats.pending_candidates = self.gc_candidates.read().len();
stats.versions_with_readers = self.active_readers.len();
}
collected
}
pub fn stats(&self) -> GcStats {
self.stats.read().clone()
}
pub fn shutdown(&self) {
self.terminating.store(true, Ordering::Release);
if let Some(ref tx) = self.worker_tx {
let _ = tx.send(GcMessage::Shutdown);
}
if let Some(handle) = self.worker_handle.write().take() {
let _ = handle.join();
}
}
pub fn pending_count(&self) -> usize {
self.gc_candidates.read().len()
}
pub fn pending_versions(&self) -> Vec<u64> {
self.gc_candidates
.read()
.iter()
.map(|c| c.version_id)
.collect()
}
pub fn clear_candidates(&self) {
self.gc_candidates.write().clear();
}
}
impl Drop for VersionGcRegistry {
fn drop(&mut self) {
self.shutdown();
}
}
#[derive(Debug)]
pub struct ReaderGuard {
version_id: u64,
registry: Arc<VersionGcRegistry>,
}
impl ReaderGuard {
pub fn new(version_id: u64, registry: Arc<VersionGcRegistry>) -> Self {
registry.add_reader(version_id);
Self {
version_id,
registry,
}
}
#[inline]
pub fn version_id(&self) -> u64 {
self.version_id
}
}
impl Drop for ReaderGuard {
fn drop(&mut self) {
self.registry.remove_reader(self.version_id);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_add_remove_reader() {
let config = GcConfig::for_testing();
let registry = VersionGcRegistry::new(config);
assert_eq!(registry.reader_count(1), 0);
registry.add_reader(1);
assert_eq!(registry.reader_count(1), 1);
registry.add_reader(1);
assert_eq!(registry.reader_count(1), 2);
registry.remove_reader(1);
assert_eq!(registry.reader_count(1), 1);
registry.remove_reader(1);
assert_eq!(registry.reader_count(1), 0);
assert!(!registry.has_readers(1));
}
#[test]
fn test_reader_guard() {
let config = GcConfig::for_testing();
let registry = VersionGcRegistry::new(config);
assert_eq!(registry.reader_count(1), 0);
{
let _guard = ReaderGuard::new(1, Arc::clone(®istry));
assert_eq!(registry.reader_count(1), 1);
}
assert_eq!(registry.reader_count(1), 0);
}
#[test]
fn test_gc_candidate() {
let config = GcConfig::for_testing();
let registry = VersionGcRegistry::new(config);
registry.add_gc_candidate(1, 100, 50);
registry.add_gc_candidate(2, 200, 75);
assert_eq!(registry.pending_count(), 2);
assert!(registry.pending_versions().contains(&1));
assert!(registry.pending_versions().contains(&2));
}
#[test]
fn test_gc_cycle_respects_readers() {
let config = GcConfig {
grace_period: Duration::from_millis(0), min_retained_versions: 0,
..GcConfig::for_testing()
};
let registry = VersionGcRegistry::new(config);
registry.add_gc_candidate(1, 100, 50);
registry.add_reader(1);
registry.record_modification();
let collected = registry.run_gc_cycle_internal();
assert!(collected.is_empty());
assert_eq!(registry.stats().versions_skipped, 1);
registry.remove_reader(1);
registry.record_modification();
let collected = registry.run_gc_cycle_internal();
assert_eq!(collected, vec![1]);
}
#[test]
fn test_gc_cycle_respects_grace_period() {
let config = GcConfig {
grace_period: Duration::from_secs(10), min_retained_versions: 0,
..GcConfig::for_testing()
};
let registry = VersionGcRegistry::new(config);
registry.add_gc_candidate(1, 100, 50);
registry.record_modification();
let collected = registry.run_gc_cycle_internal();
assert!(collected.is_empty());
}
#[test]
fn test_gc_cycle_respects_retention() {
let config = GcConfig {
grace_period: Duration::from_millis(0),
min_retained_versions: 3,
..GcConfig::for_testing()
};
let registry = VersionGcRegistry::new(config);
for i in 1..=4 {
registry.add_gc_candidate(i, i * 100, i * 50);
}
registry.record_modification();
let collected = registry.run_gc_cycle_internal();
assert_eq!(collected.len(), 1);
assert_eq!(registry.pending_count(), 3);
}
#[test]
fn test_gc_stats() {
let config = GcConfig {
grace_period: Duration::from_millis(0),
min_retained_versions: 0,
..GcConfig::for_testing()
};
let registry = VersionGcRegistry::new(config);
registry.add_gc_candidate(1, 100, 50);
registry.record_modification();
registry.run_gc_cycle_internal();
let stats = registry.stats();
assert_eq!(stats.cycles_run, 1);
assert_eq!(stats.versions_collected, 1);
assert!(stats.bytes_reclaimed > 0);
}
#[test]
fn test_no_gc_without_modifications() {
let config = GcConfig::for_testing();
let registry = VersionGcRegistry::new(config);
registry.add_gc_candidate(1, 100, 50);
let collected = registry.run_gc_cycle_internal();
assert!(collected.is_empty());
}
#[test]
fn test_shutdown() {
let config = GcConfig {
background_gc: true,
gc_interval: Duration::from_millis(10),
..GcConfig::for_testing()
};
let registry = VersionGcRegistry::new(config);
thread::sleep(Duration::from_millis(50));
registry.shutdown();
}
}