use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use crate::mvcc::tx_manager::TxManager;
use crate::mvcc::version_chain::VersionChainManager;
use crate::types::{MvccTxStatus, Timestamp};
pub const DEFAULT_MAX_CHAIN_DEPTH: usize = 10;
pub const DEFAULT_GC_INTERVAL_MS: u64 = 5000;
pub const DEFAULT_RETENTION_MS: u64 = 60000;
#[derive(Debug, Clone, Default)]
pub struct GcStats {
pub versions_pruned: u64,
pub chains_truncated: u64,
pub gc_runs: u64,
pub last_gc_time: u64,
pub txs_cleaned: u64,
}
#[derive(Debug, Clone)]
pub struct GcConfig {
pub interval_ms: u64,
pub retention_ms: u64,
pub max_chain_depth: usize,
}
impl Default for GcConfig {
fn default() -> Self {
Self {
interval_ms: DEFAULT_GC_INTERVAL_MS,
retention_ms: DEFAULT_RETENTION_MS,
max_chain_depth: DEFAULT_MAX_CHAIN_DEPTH,
}
}
}
#[derive(Debug)]
pub struct GarbageCollector {
config: GcConfig,
stats: GcStats,
running: AtomicBool,
last_run: Option<Instant>,
}
impl GarbageCollector {
pub fn new() -> Self {
Self::with_config(GcConfig::default())
}
pub fn with_config(config: GcConfig) -> Self {
Self {
config,
stats: GcStats::default(),
running: AtomicBool::new(false),
last_run: None,
}
}
pub fn config(&self) -> &GcConfig {
&self.config
}
pub fn set_config(&mut self, config: GcConfig) {
self.config = config;
}
pub fn run_gc(
&mut self,
tx_manager: &mut TxManager,
version_chain: &mut VersionChainManager,
) -> GcResult {
if self.running.swap(true, Ordering::SeqCst) {
return GcResult {
versions_pruned: 0,
chains_truncated: 0,
txs_cleaned: 0,
skipped: true,
};
}
let result = self.do_gc(tx_manager, version_chain);
self.running.store(false, Ordering::SeqCst);
self.last_run = Some(Instant::now());
result
}
fn do_gc(
&mut self,
tx_manager: &mut TxManager,
version_chain: &mut VersionChainManager,
) -> GcResult {
let min_active_ts = tx_manager.min_active_ts();
let retention_horizon_ts = tx_manager.get_retention_horizon_ts(self.config.retention_ms);
let horizon_ts = min_active_ts.min(retention_horizon_ts);
let pruned = version_chain.prune_old_versions(horizon_ts);
let truncated =
version_chain.truncate_deep_chains(self.config.max_chain_depth, Some(min_active_ts));
let txs_cleaned = self.cleanup_old_transactions(tx_manager, horizon_ts);
tx_manager.prune_wall_clock_mappings(horizon_ts);
self.stats.versions_pruned += pruned as u64;
self.stats.chains_truncated += truncated as u64;
self.stats.txs_cleaned += txs_cleaned as u64;
self.stats.gc_runs += 1;
self.stats.last_gc_time = current_time_ms();
GcResult {
versions_pruned: pruned,
chains_truncated: truncated,
txs_cleaned,
skipped: false,
}
}
fn cleanup_old_transactions(&self, tx_manager: &mut TxManager, horizon_ts: Timestamp) -> usize {
let txs_to_remove: Vec<_> = tx_manager
.get_all_txs()
.filter(|(_, tx)| {
tx.status == MvccTxStatus::Committed
&& tx.commit_ts.is_some()
&& tx.commit_ts.unwrap() < horizon_ts
})
.map(|(&txid, _)| txid)
.collect();
let count = txs_to_remove.len();
for txid in txs_to_remove {
tx_manager.remove_tx(txid);
}
count
}
pub fn should_run(&self) -> bool {
match self.last_run {
None => true,
Some(last) => last.elapsed() >= Duration::from_millis(self.config.interval_ms),
}
}
pub fn force_gc(
&mut self,
tx_manager: &mut TxManager,
version_chain: &mut VersionChainManager,
) -> usize {
let result = self.run_gc(tx_manager, version_chain);
result.versions_pruned
}
pub fn get_stats(&self) -> GcStats {
self.stats.clone()
}
pub fn reset_stats(&mut self) {
self.stats = GcStats::default();
}
pub fn is_running(&self) -> bool {
self.running.load(Ordering::SeqCst)
}
}
impl Default for GarbageCollector {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Default)]
pub struct GcResult {
pub versions_pruned: usize,
pub chains_truncated: usize,
pub txs_cleaned: usize,
pub skipped: bool,
}
#[derive(Debug)]
pub struct SharedGcState {
pub stop_signal: AtomicBool,
pub versions_pruned: AtomicU64,
pub gc_runs: AtomicU64,
}
impl SharedGcState {
pub fn new() -> Self {
Self {
stop_signal: AtomicBool::new(false),
versions_pruned: AtomicU64::new(0),
gc_runs: AtomicU64::new(0),
}
}
pub fn stop(&self) {
self.stop_signal.store(true, Ordering::SeqCst);
}
pub fn should_stop(&self) -> bool {
self.stop_signal.load(Ordering::SeqCst)
}
pub fn record_gc_run(&self, pruned: u64) {
self.versions_pruned.fetch_add(pruned, Ordering::Relaxed);
self.gc_runs.fetch_add(1, Ordering::Relaxed);
}
}
impl Default for SharedGcState {
fn default() -> Self {
Self::new()
}
}
#[cfg(not(target_arch = "wasm32"))]
use std::thread::{self, JoinHandle};
#[cfg(not(target_arch = "wasm32"))]
pub struct BackgroundGcHandle {
state: Arc<SharedGcState>,
thread: Option<JoinHandle<()>>,
}
#[cfg(not(target_arch = "wasm32"))]
impl BackgroundGcHandle {
pub fn stop(mut self) {
self.state.stop();
if let Some(handle) = self.thread.take() {
let _ = handle.join();
}
}
pub fn is_running(&self) -> bool {
self
.thread
.as_ref()
.map(|h| !h.is_finished())
.unwrap_or(false)
}
pub fn gc_runs(&self) -> u64 {
self.state.gc_runs.load(Ordering::Relaxed)
}
pub fn versions_pruned(&self) -> u64 {
self.state.versions_pruned.load(Ordering::Relaxed)
}
pub fn state(&self) -> &Arc<SharedGcState> {
&self.state
}
}
#[cfg(not(target_arch = "wasm32"))]
impl Drop for BackgroundGcHandle {
fn drop(&mut self) {
self.state.stop();
if let Some(handle) = self.thread.take() {
let _ = handle.join();
}
}
}
#[cfg(target_arch = "wasm32")]
pub struct BackgroundGcHandle {
state: Arc<SharedGcState>,
}
#[cfg(target_arch = "wasm32")]
impl BackgroundGcHandle {
pub fn stop(self) {
self.state.stop();
}
pub fn is_running(&self) -> bool {
false
}
pub fn gc_runs(&self) -> u64 {
self.state.gc_runs.load(Ordering::Relaxed)
}
pub fn versions_pruned(&self) -> u64 {
self.state.versions_pruned.load(Ordering::Relaxed)
}
pub fn state(&self) -> &Arc<SharedGcState> {
&self.state
}
}
#[cfg(not(target_arch = "wasm32"))]
pub fn start_background_gc(
tx_manager: Arc<parking_lot::Mutex<TxManager>>,
version_chain: Arc<parking_lot::Mutex<VersionChainManager>>,
config: GcConfig,
) -> BackgroundGcHandle {
let state = Arc::new(SharedGcState::new());
let state_clone = state.clone();
let interval = Duration::from_millis(config.interval_ms);
let thread = thread::spawn(move || {
let mut gc = GarbageCollector::with_config(config);
while !state_clone.should_stop() {
thread::sleep(interval);
if state_clone.should_stop() {
break;
}
let mut tx_mgr = tx_manager.lock();
let mut vc = version_chain.lock();
let result = gc.run_gc(&mut tx_mgr, &mut vc);
state_clone.record_gc_run(result.versions_pruned as u64);
}
});
BackgroundGcHandle {
state,
thread: Some(thread),
}
}
#[cfg(target_arch = "wasm32")]
pub fn start_background_gc(
_tx_manager: Arc<parking_lot::Mutex<TxManager>>,
_version_chain: Arc<parking_lot::Mutex<VersionChainManager>>,
_config: GcConfig,
) -> BackgroundGcHandle {
BackgroundGcHandle {
state: Arc::new(SharedGcState::new()),
}
}
fn current_time_ms() -> u64 {
use std::time::{SystemTime, UNIX_EPOCH};
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::{NodeDelta, NodeVersionData};
fn setup() -> (TxManager, VersionChainManager, GarbageCollector) {
let tx_mgr = TxManager::new();
let version_chain = VersionChainManager::new();
let gc = GarbageCollector::new();
(tx_mgr, version_chain, gc)
}
#[test]
fn test_gc_new() {
let gc = GarbageCollector::new();
assert_eq!(gc.config.interval_ms, DEFAULT_GC_INTERVAL_MS);
assert_eq!(gc.config.retention_ms, DEFAULT_RETENTION_MS);
assert_eq!(gc.config.max_chain_depth, DEFAULT_MAX_CHAIN_DEPTH);
}
#[test]
fn test_gc_with_config() {
let config = GcConfig {
interval_ms: 1000,
retention_ms: 5000,
max_chain_depth: 5,
};
let gc = GarbageCollector::with_config(config.clone());
assert_eq!(gc.config.interval_ms, 1000);
assert_eq!(gc.config.retention_ms, 5000);
assert_eq!(gc.config.max_chain_depth, 5);
}
#[test]
fn test_gc_run_empty() {
let (mut tx_mgr, mut version_chain, mut gc) = setup();
let result = gc.run_gc(&mut tx_mgr, &mut version_chain);
assert!(!result.skipped);
assert_eq!(result.versions_pruned, 0);
assert_eq!(result.chains_truncated, 0);
}
#[test]
fn test_gc_prunes_old_versions() {
let (mut tx_mgr, mut version_chain, mut gc) = setup();
gc.config.retention_ms = 0;
for i in 1..=5 {
let data = NodeVersionData {
node_id: 1,
delta: NodeDelta::default(),
};
version_chain.append_node_version(1, data, i, i);
}
let _result = gc.run_gc(&mut tx_mgr, &mut version_chain);
assert!(gc.stats.gc_runs > 0);
}
#[test]
fn test_gc_respects_active_transactions() {
let (mut tx_mgr, mut version_chain, mut gc) = setup();
gc.config.retention_ms = 0;
let (_txid, _start_ts) = tx_mgr.begin_tx();
let data = NodeVersionData {
node_id: 1,
delta: NodeDelta::default(),
};
version_chain.append_node_version(1, data, 1, 1);
let _result = gc.run_gc(&mut tx_mgr, &mut version_chain);
assert!(version_chain.get_node_version(1).is_some());
}
#[test]
fn test_gc_truncates_deep_chains() {
let (mut tx_mgr, mut version_chain, mut gc) = setup();
gc.config.max_chain_depth = 3;
gc.config.retention_ms = u64::MAX;
for i in 1..=10 {
let data = NodeVersionData {
node_id: 1,
delta: NodeDelta::default(),
};
version_chain.append_node_version(1, data, i, u64::MAX - i);
}
let _result = gc.run_gc(&mut tx_mgr, &mut version_chain);
let mut depth = 0;
let mut current = version_chain.get_node_version(1);
while let Some(v) = current {
depth += 1;
current = v.prev.as_deref();
}
assert!(depth <= gc.config.max_chain_depth + 1);
}
#[test]
fn test_gc_cleans_committed_transactions() {
let (mut tx_mgr, mut version_chain, mut gc) = setup();
gc.config.retention_ms = 0;
for _ in 0..5 {
let (txid, _) = tx_mgr.begin_tx();
tx_mgr.commit_tx(txid).unwrap();
}
let (_txid, _) = tx_mgr.begin_tx();
let _result = gc.run_gc(&mut tx_mgr, &mut version_chain);
assert!(gc.stats.gc_runs > 0);
}
#[test]
fn test_gc_stats() {
let (mut tx_mgr, mut version_chain, mut gc) = setup();
gc.run_gc(&mut tx_mgr, &mut version_chain);
gc.run_gc(&mut tx_mgr, &mut version_chain);
gc.run_gc(&mut tx_mgr, &mut version_chain);
let stats = gc.get_stats();
assert_eq!(stats.gc_runs, 3);
}
#[test]
fn test_gc_reset_stats() {
let (mut tx_mgr, mut version_chain, mut gc) = setup();
gc.run_gc(&mut tx_mgr, &mut version_chain);
gc.reset_stats();
let stats = gc.get_stats();
assert_eq!(stats.gc_runs, 0);
assert_eq!(stats.versions_pruned, 0);
}
#[test]
fn test_should_run() {
let gc = GarbageCollector::new();
assert!(gc.should_run());
}
#[test]
fn test_is_running() {
let gc = GarbageCollector::new();
assert!(!gc.is_running());
}
#[test]
fn test_shared_gc_state() {
let state = SharedGcState::new();
assert!(!state.should_stop());
state.record_gc_run(10);
assert_eq!(state.versions_pruned.load(Ordering::Relaxed), 10);
assert_eq!(state.gc_runs.load(Ordering::Relaxed), 1);
state.stop();
assert!(state.should_stop());
}
#[test]
fn test_gc_result_default() {
let result = GcResult::default();
assert_eq!(result.versions_pruned, 0);
assert_eq!(result.chains_truncated, 0);
assert_eq!(result.txs_cleaned, 0);
assert!(!result.skipped);
}
#[test]
fn test_gc_config_default() {
let config = GcConfig::default();
assert_eq!(config.interval_ms, DEFAULT_GC_INTERVAL_MS);
assert_eq!(config.retention_ms, DEFAULT_RETENTION_MS);
assert_eq!(config.max_chain_depth, DEFAULT_MAX_CHAIN_DEPTH);
}
#[test]
fn test_force_gc() {
let (mut tx_mgr, mut version_chain, mut gc) = setup();
let _pruned = gc.force_gc(&mut tx_mgr, &mut version_chain);
assert!(gc.stats.gc_runs > 0);
}
#[test]
fn test_set_config() {
let mut gc = GarbageCollector::new();
let new_config = GcConfig {
interval_ms: 100,
retention_ms: 200,
max_chain_depth: 3,
};
gc.set_config(new_config);
assert_eq!(gc.config().interval_ms, 100);
assert_eq!(gc.config().retention_ms, 200);
assert_eq!(gc.config().max_chain_depth, 3);
}
}