use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use rayon::prelude::*;
use tokio::time::{Duration, Instant};
use crate::common::{Address, Key, Result, RsKvError, get_page};
use crate::hlog::HybridLog;
use crate::index::SharedMemHashIndex;
pub struct GcState {
in_progress: AtomicBool,
#[allow(dead_code)]
target_begin_address: AtomicU64,
hlog: Arc<HybridLog>,
index: SharedMemHashIndex,
last_stats: parking_lot::Mutex<Option<GcStats>>,
entries_processed: AtomicUsize,
entries_removed: AtomicUsize,
}
#[derive(Debug, Clone)]
pub struct GcStats {
pub initial_begin_address: Address,
pub new_begin_address: Address,
pub bytes_reclaimed: u64,
pub entries_processed: usize,
pub entries_removed: usize,
pub duration: Duration,
pub start_time: Instant,
}
#[derive(Debug, Clone)]
pub struct GcConfig {
pub min_reclaim_bytes: u64,
pub max_batch_size: usize,
pub target_utilization: f64,
pub parallel_scan: bool,
}
impl Default for GcConfig {
fn default() -> Self {
Self {
min_reclaim_bytes: 64 * 1024 * 1024, max_batch_size: 10000,
target_utilization: 0.7, parallel_scan: true,
}
}
}
impl GcState {
pub fn new(hlog: Arc<HybridLog>, index: SharedMemHashIndex) -> Self {
Self {
in_progress: AtomicBool::new(false),
target_begin_address: AtomicU64::new(0),
hlog,
index,
last_stats: parking_lot::Mutex::new(None),
entries_processed: AtomicUsize::new(0),
entries_removed: AtomicUsize::new(0),
}
}
pub fn is_in_progress(&self) -> bool {
self.in_progress.load(Ordering::Acquire)
}
pub fn last_stats(&self) -> Option<GcStats> {
self.last_stats.lock().clone()
}
pub async fn initiate_gc(&self, config: GcConfig) -> Result<GcStats> {
if self
.in_progress
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
return Err(RsKvError::GarbageCollectionFailed {
message: "Garbage collection already in progress".to_string(),
});
}
let start_time = Instant::now();
log::info!("Initiating garbage collection with config: {config:?}");
self.entries_processed.store(0, Ordering::Release);
self.entries_removed.store(0, Ordering::Release);
let initial_begin = self.hlog.get_begin_address();
let current_head = self.hlog.get_head_address();
let new_begin = self.calculate_new_begin_address(&config, initial_begin, current_head)?;
if new_begin <= initial_begin {
log::info!("No garbage collection needed");
self.in_progress.store(false, Ordering::Release);
return Ok(GcStats {
initial_begin_address: initial_begin,
new_begin_address: initial_begin,
bytes_reclaimed: 0,
entries_processed: 0,
entries_removed: 0,
duration: start_time.elapsed(),
start_time,
});
}
log::info!(
"Moving begin address from 0x{:x} to 0x{:x}",
initial_begin,
new_begin
);
let (entries_processed, entries_removed) =
self.cleanup_index_entries(new_begin, &config).await?;
let actual_bytes_reclaimed = self.hlog.advance_begin_address(new_begin)?;
log::info!(
"Cleaned {} entries, removed {}, reclaimed {} bytes",
entries_processed,
entries_removed,
actual_bytes_reclaimed
);
let bytes_reclaimed = new_begin.saturating_sub(initial_begin);
let stats = GcStats {
initial_begin_address: initial_begin,
new_begin_address: new_begin,
bytes_reclaimed,
entries_processed,
entries_removed,
duration: start_time.elapsed(),
start_time,
};
*self.last_stats.lock() = Some(stats.clone());
log::info!(
"Garbage collection completed in {:?}, reclaimed {} bytes",
stats.duration,
bytes_reclaimed
);
self.in_progress.store(false, Ordering::Release);
Ok(stats)
}
fn calculate_new_begin_address(
&self,
config: &GcConfig,
current_begin: Address,
current_head: Address,
) -> Result<Address> {
let available_space = current_head.saturating_sub(current_begin);
if available_space < config.min_reclaim_bytes {
return Ok(current_begin);
}
let target_reclaim = (available_space as f64 * (1.0 - config.target_utilization)) as u64;
let new_begin = current_begin + target_reclaim.min(available_space);
let new_begin_page = get_page(new_begin);
let aligned_begin = crate::common::make_address(new_begin_page, 0);
Ok(aligned_begin.min(current_head))
}
async fn cleanup_index_entries(
&self,
new_begin_address: Address,
config: &GcConfig,
) -> Result<(usize, usize)> {
log::debug!(
"Cleaning up index entries older than address 0x{:x}",
new_begin_address
);
if config.parallel_scan {
self.parallel_cleanup_index(new_begin_address, config).await
} else {
self.sequential_cleanup_index(new_begin_address, config)
.await
}
}
async fn parallel_cleanup_index(
&self,
new_begin_address: Address,
_config: &GcConfig,
) -> Result<(usize, usize)> {
let all_entries = self.index.snapshot();
let total_entries = all_entries.len();
log::debug!("Scanning {total_entries} index entries in parallel");
let stale_keys: Vec<Key> = all_entries
.par_iter()
.filter_map(|(key, address)| {
if *address < new_begin_address {
Some(key.clone())
} else {
None
}
})
.collect();
let entries_to_remove = stale_keys.len();
for key in stale_keys {
self.index.remove_if_address(&key, new_begin_address);
}
self.entries_processed
.store(total_entries, Ordering::Release);
self.entries_removed
.store(entries_to_remove, Ordering::Release);
Ok((total_entries, entries_to_remove))
}
async fn sequential_cleanup_index(
&self,
new_begin_address: Address,
config: &GcConfig,
) -> Result<(usize, usize)> {
let mut entries_processed = 0;
let mut entries_removed = 0;
let mut batch = Vec::new();
self.index.for_each(|key, address| {
batch.push((key.clone(), address));
if batch.len() >= config.max_batch_size {
let (processed, removed) = self.process_batch(&batch, new_begin_address);
entries_processed += processed;
entries_removed += removed;
batch.clear();
}
});
if !batch.is_empty() {
let (processed, removed) = self.process_batch(&batch, new_begin_address);
entries_processed += processed;
entries_removed += removed;
}
self.entries_processed
.store(entries_processed, Ordering::Release);
self.entries_removed
.store(entries_removed, Ordering::Release);
Ok((entries_processed, entries_removed))
}
fn process_batch(
&self,
batch: &[(Key, Address)],
new_begin_address: Address,
) -> (usize, usize) {
let mut removed = 0;
for (key, address) in batch {
if *address < new_begin_address {
if self.index.remove_if_address(key, *address) {
removed += 1;
}
}
}
(batch.len(), removed)
}
pub fn estimate_reclaimable_space(&self) -> Result<GcEstimate> {
let current_begin = self.hlog.get_begin_address();
let current_head = self.hlog.get_head_address();
let current_tail = self.hlog.get_tail_address();
let mut entries_in_disk_region = 0;
let mut entries_in_memory_region = 0;
let mut total_entries = 0;
self.index.for_each(|_key, address| {
total_entries += 1;
if address < current_head {
entries_in_disk_region += 1;
} else {
entries_in_memory_region += 1;
}
});
let disk_region_size = current_head.saturating_sub(current_begin);
let memory_region_size = current_tail.saturating_sub(current_head);
Ok(GcEstimate {
total_log_size: current_tail.saturating_sub(current_begin),
disk_region_size,
memory_region_size,
reclaimable_space: disk_region_size,
total_index_entries: total_entries,
entries_in_disk_region,
entries_in_memory_region,
})
}
pub fn should_run_gc(&self, config: &GcConfig) -> Result<bool> {
let estimate = self.estimate_reclaimable_space()?;
Ok(estimate.reclaimable_space >= config.min_reclaim_bytes)
}
}
#[derive(Debug, Clone)]
pub struct GcEstimate {
pub total_log_size: u64,
pub disk_region_size: u64,
pub memory_region_size: u64,
pub reclaimable_space: u64,
pub total_index_entries: usize,
pub entries_in_disk_region: usize,
pub entries_in_memory_region: usize,
}
trait ConditionalRemoval {
fn remove_if_address(&self, key: &Key, threshold_address: Address) -> bool;
}
impl ConditionalRemoval for SharedMemHashIndex {
fn remove_if_address(&self, key: &Key, threshold_address: Address) -> bool {
if let Some(address) = self.find(key)
&& address < threshold_address
{
return self.remove_if_address(key, address);
}
false
}
}
#[cfg(test)]
mod tests {
use tempfile::tempdir;
use super::*;
use crate::epoch::EpochManager;
use crate::hlog::FileStorageDevice;
use crate::index::new_shared_mem_hash_index;
async fn create_test_gc_state() -> (GcState, tempfile::TempDir) {
let temp_dir = tempdir().unwrap();
let epoch = Arc::new(EpochManager::new());
let storage = Box::new(FileStorageDevice::new(temp_dir.path().join("test.log")).unwrap());
let hlog = Arc::new(HybridLog::new(64 * 1024 * 1024, storage, epoch.clone()).unwrap());
let index = new_shared_mem_hash_index(epoch);
let gc_state = GcState::new(hlog, index);
(gc_state, temp_dir)
}
#[tokio::test]
async fn test_gc_estimate() {
let (gc_state, _temp_dir) = create_test_gc_state().await;
gc_state.index.insert(b"key1".to_vec(), 1000);
gc_state.index.insert(b"key2".to_vec(), 2000);
gc_state.index.insert(b"key3".to_vec(), 3000);
let estimate = gc_state.estimate_reclaimable_space().unwrap();
assert_eq!(estimate.total_index_entries, 3);
}
#[tokio::test]
async fn test_gc_should_run() {
let (gc_state, _temp_dir) = create_test_gc_state().await;
let config = GcConfig {
min_reclaim_bytes: 100, ..Default::default()
};
let should_run = gc_state.should_run_gc(&config).unwrap();
assert!(!should_run);
}
#[tokio::test]
async fn test_gc_basic_operation() {
let (gc_state, _temp_dir) = create_test_gc_state().await;
gc_state.index.insert(b"old_key1".to_vec(), 100);
gc_state.index.insert(b"old_key2".to_vec(), 200);
gc_state.index.insert(b"new_key1".to_vec(), 10000);
let config = GcConfig {
min_reclaim_bytes: 0, target_utilization: 0.5, ..Default::default()
};
let stats = gc_state.initiate_gc(config).await.unwrap();
assert!(!gc_state.is_in_progress());
if let Some(last_stats) = gc_state.last_stats() {
assert_eq!(last_stats.entries_processed, stats.entries_processed);
}
}
#[tokio::test]
async fn test_gc_concurrent_prevention() {
let (gc_state, _temp_dir) = create_test_gc_state().await;
let config = GcConfig::default();
let _first_result = gc_state.initiate_gc(config.clone()).await;
gc_state.in_progress.store(true, Ordering::Release);
let second_result = gc_state.initiate_gc(config).await;
assert!(second_result.is_err());
assert!(matches!(
second_result,
Err(RsKvError::GarbageCollectionFailed { .. })
));
gc_state.in_progress.store(false, Ordering::Release);
}
#[tokio::test]
async fn test_parallel_vs_sequential_cleanup() {
let (gc_state, _temp_dir) = create_test_gc_state().await;
for i in 0..100 {
gc_state
.index
.insert(format!("key_{}", i).into_bytes(), i as u64);
}
let new_begin = 50;
let config_parallel = GcConfig {
parallel_scan: true,
..Default::default()
};
let (processed_par, removed_par) = gc_state
.parallel_cleanup_index(new_begin, &config_parallel)
.await
.unwrap();
for i in 0..removed_par {
gc_state
.index
.insert(format!("key_{}", i).into_bytes(), i as u64);
}
let config_sequential = GcConfig {
parallel_scan: false,
max_batch_size: 10,
..Default::default()
};
let (processed_seq, _removed_seq) = gc_state
.sequential_cleanup_index(new_begin, &config_sequential)
.await
.unwrap();
assert!(processed_par > 0);
assert!(processed_seq > 0);
}
}