use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::sync::RwLock as AsyncRwLock;
use tokio::time::{Duration, MissedTickBehavior, interval};
use crate::checkpoint::CheckpointState;
use crate::common::{Config, Result, RsKvError};
use crate::gc::{GcConfig, GcState};
use crate::hlog::HybridLog;
pub struct BackgroundTaskManager {
running: Arc<AtomicBool>,
config: Config,
checkpoint_state: Arc<CheckpointState>,
gc_state: Arc<GcState>,
hlog: Arc<HybridLog>,
operation_lock: Arc<AsyncRwLock<()>>,
task_handles: parking_lot::Mutex<Vec<tokio::task::JoinHandle<()>>>,
}
impl BackgroundTaskManager {
pub fn new(
config: Config,
checkpoint_state: Arc<CheckpointState>,
gc_state: Arc<GcState>,
hlog: Arc<HybridLog>,
operation_lock: Arc<AsyncRwLock<()>>,
) -> Self {
Self {
running: Arc::new(AtomicBool::new(false)),
config,
checkpoint_state,
gc_state,
hlog,
operation_lock,
task_handles: parking_lot::Mutex::new(Vec::new()),
}
}
pub fn start(&self) -> Result<()> {
if self
.running
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
return Err(RsKvError::Internal {
message: "Background tasks are already running".to_string(),
});
}
log::info!("Starting background task manager");
let mut handles = self.task_handles.lock();
if self.config.enable_checkpointing {
let handle = self.start_checkpoint_task();
handles.push(handle);
}
if self.config.enable_gc {
let handle = self.start_gc_task();
handles.push(handle);
}
let handle = self.start_log_maintenance_task();
handles.push(handle);
log::info!("Started {} background tasks", handles.len());
Ok(())
}
pub async fn stop(&self) -> Result<()> {
if !self.running.swap(false, Ordering::AcqRel) {
return Ok(()); }
log::info!("Stopping background tasks");
let handles = {
let mut handles = self.task_handles.lock();
std::mem::take(&mut *handles)
};
for handle in handles {
handle.abort();
let _ = handle.await; }
log::info!("All background tasks stopped");
Ok(())
}
pub fn is_running(&self) -> bool {
self.running.load(Ordering::Acquire)
}
fn start_checkpoint_task(&self) -> tokio::task::JoinHandle<()> {
let running = self.running.clone();
let checkpoint_state = self.checkpoint_state.clone();
let operation_lock = self.operation_lock.clone();
let interval_ms = self.config.checkpoint_interval_ms;
tokio::spawn(async move {
let mut interval = interval(Duration::from_millis(interval_ms));
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
log::info!("Checkpoint task started with interval {interval_ms}ms");
while running.load(Ordering::Acquire) {
interval.tick().await;
if !running.load(Ordering::Acquire) {
break;
}
if let Ok(_lock) = operation_lock.try_write() {
match checkpoint_state.initiate_checkpoint().await {
Ok(metadata) => {
log::debug!(
"Background checkpoint {} completed",
metadata.checkpoint_id
);
}
Err(e) => {
log::warn!("Background checkpoint failed: {e}");
}
}
} else {
log::debug!("Skipping checkpoint - manual operation in progress");
}
}
log::info!("Checkpoint task stopped");
})
}
fn start_gc_task(&self) -> tokio::task::JoinHandle<()> {
let running = self.running.clone();
let gc_state = self.gc_state.clone();
let operation_lock = self.operation_lock.clone();
let interval_ms = self.config.gc_interval_ms;
tokio::spawn(async move {
let mut interval = interval(Duration::from_millis(interval_ms));
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
log::info!("GC task started with interval {interval_ms}ms");
while running.load(Ordering::Acquire) {
interval.tick().await;
if !running.load(Ordering::Acquire) {
break;
}
let gc_config = GcConfig::default();
match gc_state.should_run_gc(&gc_config) {
Ok(true) => {
if let Ok(_lock) = operation_lock.try_read() {
match gc_state.initiate_gc(gc_config).await {
Ok(stats) => {
log::debug!(
"Background GC completed, reclaimed {} bytes",
stats.bytes_reclaimed
);
}
Err(e) => {
log::warn!("Background GC failed: {e}");
}
}
} else {
log::debug!("Skipping GC - manual operation in progress");
}
}
Ok(false) => {
log::trace!("GC not needed");
}
Err(e) => {
log::warn!("Failed to check GC requirement: {e}");
}
}
}
log::info!("GC task stopped");
})
}
fn start_log_maintenance_task(&self) -> tokio::task::JoinHandle<()> {
let running = self.running.clone();
let hlog = self.hlog.clone();
let operation_lock = self.operation_lock.clone();
tokio::spawn(async move {
let mut interval = interval(Duration::from_secs(30)); interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
log::info!("Log maintenance task started");
while running.load(Ordering::Acquire) {
interval.tick().await;
if !running.load(Ordering::Acquire) {
break;
}
if let Ok(_lock) = operation_lock.try_read() {
Self::perform_log_maintenance(&hlog).await;
}
}
log::info!("Log maintenance task stopped");
})
}
async fn perform_log_maintenance(hlog: &HybridLog) {
let tail_address = hlog.get_tail_address();
let read_only_address = hlog.get_read_only_address();
let head_address = hlog.get_head_address();
let mutable_region_size = tail_address.saturating_sub(read_only_address);
const MAX_MUTABLE_REGION_SIZE: u64 = 128 * 1024 * 1024;
if mutable_region_size > MAX_MUTABLE_REGION_SIZE {
let new_read_only = hlog.shift_read_only_address();
log::debug!("Advanced read-only address to 0x{new_read_only:x}");
if let Err(e) = hlog.flush_to_disk(new_read_only).await {
log::warn!("Failed to flush during maintenance: {e}");
}
}
let read_only_region_size = read_only_address.saturating_sub(head_address);
const MAX_READ_ONLY_REGION_SIZE: u64 = 256 * 1024 * 1024;
if read_only_region_size > MAX_READ_ONLY_REGION_SIZE {
let new_head = head_address + (read_only_region_size / 2); if let Err(e) = hlog.shift_head_address(new_head) {
log::warn!("Failed to shift head address during maintenance: {e}");
} else {
log::debug!("Advanced head address to 0x{new_head:x}");
}
}
}
pub fn get_stats(&self) -> BackgroundTaskStats {
BackgroundTaskStats {
is_running: self.is_running(),
checkpoint_enabled: self.config.enable_checkpointing,
gc_enabled: self.config.enable_gc,
checkpoint_interval_ms: self.config.checkpoint_interval_ms,
gc_interval_ms: self.config.gc_interval_ms,
active_task_count: self.task_handles.lock().len(),
}
}
}
impl Drop for BackgroundTaskManager {
fn drop(&mut self) {
let running = self.running.clone();
let handles = {
let mut handles = self.task_handles.lock();
std::mem::take(&mut *handles)
};
if running.swap(false, Ordering::AcqRel) {
for handle in handles {
handle.abort();
}
}
}
}
#[derive(Debug, Clone)]
pub struct BackgroundTaskStats {
pub is_running: bool,
pub checkpoint_enabled: bool,
pub gc_enabled: bool,
pub checkpoint_interval_ms: u64,
pub gc_interval_ms: u64,
pub active_task_count: usize,
}
#[cfg(test)]
mod tests {
use tempfile::tempdir;
use super::*;
use crate::checkpoint::CheckpointState;
use crate::epoch::EpochManager;
use crate::hlog::FileStorageDevice;
use crate::index::new_shared_mem_hash_index;
async fn create_test_background_manager() -> (BackgroundTaskManager, tempfile::TempDir) {
let temp_dir = tempdir().unwrap();
let config = Config {
storage_dir: temp_dir.path().to_string_lossy().to_string(),
memory_size: 32 * 1024 * 1024, enable_checkpointing: true,
checkpoint_interval_ms: 100, enable_gc: true,
gc_interval_ms: 200, ..Default::default()
};
let epoch = Arc::new(EpochManager::new());
let storage = Box::new(FileStorageDevice::new(temp_dir.path().join("test.log")).unwrap());
let hlog = Arc::new(HybridLog::new(config.memory_size, storage, epoch.clone()).unwrap());
let index = new_shared_mem_hash_index(epoch);
let checkpoint_dir = temp_dir.path().join("checkpoints");
let checkpoint_state =
Arc::new(CheckpointState::new(checkpoint_dir, hlog.clone(), index.clone()).unwrap());
let gc_state = Arc::new(GcState::new(hlog.clone(), index));
let operation_lock = Arc::new(AsyncRwLock::new(()));
let manager =
BackgroundTaskManager::new(config, checkpoint_state, gc_state, hlog, operation_lock);
(manager, temp_dir)
}
#[tokio::test]
async fn test_background_manager_start_stop() {
let (manager, _temp_dir) = create_test_background_manager().await;
assert!(!manager.is_running());
manager.start().unwrap();
assert!(manager.is_running());
manager.stop().await.unwrap();
assert!(!manager.is_running());
}
#[tokio::test]
async fn test_background_manager_double_start() {
let (manager, _temp_dir) = create_test_background_manager().await;
manager.start().unwrap();
let result = manager.start();
assert!(result.is_err());
manager.stop().await.unwrap();
}
#[tokio::test]
async fn test_background_tasks_run() {
let (manager, _temp_dir) = create_test_background_manager().await;
manager.start().unwrap();
tokio::time::sleep(Duration::from_millis(500)).await;
assert!(manager.is_running());
manager.stop().await.unwrap();
}
#[tokio::test]
async fn test_background_manager_stats() {
let (manager, _temp_dir) = create_test_background_manager().await;
let stats_before = manager.get_stats();
assert!(!stats_before.is_running);
assert_eq!(stats_before.active_task_count, 0);
manager.start().unwrap();
let stats_after = manager.get_stats();
assert!(stats_after.is_running);
assert!(stats_after.checkpoint_enabled);
assert!(stats_after.gc_enabled);
assert!(stats_after.active_task_count > 0);
manager.stop().await.unwrap();
}
#[test]
fn test_background_manager_drop() {
tokio::runtime::Runtime::new().unwrap().block_on(async {
let (manager, _temp_dir) = create_test_background_manager().await;
manager.start().unwrap();
assert!(manager.is_running());
drop(manager);
tokio::time::sleep(Duration::from_millis(50)).await;
});
}
}