use crate::backend::native::v2::wal::V2WALConfig;
use crate::backend::native::v2::wal::checkpoint::{
constants::*,
errors::*,
io::multi_file::{
CheckpointManifest, CheckpointSegmentMeta, MultiFileCheckpointConfig, MultiFileRecovery,
SegmentWriter,
},
operations::CheckpointExecutor,
strategies::CheckpointStrategy,
};
use parking_lot::{Condvar, Mutex};
use std::collections::{HashMap, HashSet};
use std::fs::File;
use std::io::{BufWriter, Write};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CheckpointState {
Idle,
Initializing,
Collecting,
Processing,
Flushing,
Validating,
Complete,
Failed,
}
impl Default for CheckpointState {
fn default() -> Self {
CheckpointState::Idle
}
}
pub struct V2WALCheckpointManager {
config: V2WALConfig,
checkpoint_file: Arc<Mutex<BufWriter<File>>>,
state: Arc<Mutex<CheckpointManagerState>>,
strategy: Arc<Mutex<CheckpointStrategy>>,
dirty_blocks: Arc<Mutex<DirtyBlockTracker>>,
executor: Arc<Mutex<CheckpointExecutor>>,
checkpoint_cv: Arc<Condvar>,
shutdown_flag: Arc<Mutex<bool>>,
multi_file_config: Option<Arc<Mutex<MultiFileCheckpointConfig>>>,
}
#[derive(Debug)]
pub struct CheckpointManagerState {
pub current_state: CheckpointState,
pub checkpointed_lsn: u64,
pub in_progress: bool,
pub last_checkpoint: Option<Instant>,
pub current_operation_id: u64,
pub completed_checkpoints: u64,
pub failed_attempts: u64,
pub checkpoint_start_time: Option<Instant>,
pub transactions_since_checkpoint: u64,
pub checkpointed_wal_size: u64,
}
impl Default for CheckpointManagerState {
fn default() -> Self {
Self {
current_state: CheckpointState::Idle,
checkpointed_lsn: 0,
in_progress: false,
last_checkpoint: None,
current_operation_id: 0,
completed_checkpoints: 0,
failed_attempts: 0,
checkpoint_start_time: None,
transactions_since_checkpoint: 0,
checkpointed_wal_size: 0,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DirtyBlockOverflowStrategy {
Reject,
ForceCheckpoint,
SpillToDisk,
HierarchicalPromotion,
}
impl Default for DirtyBlockOverflowStrategy {
fn default() -> Self {
Self::Reject }
}
impl Default for DirtyBlockTracker {
fn default() -> Self {
Self::new(MAX_DIRTY_BLOCKS_PER_CLUSTER, MAX_GLOBAL_DIRTY_BLOCKS)
}
}
#[derive(Debug)]
pub struct DiskOverflowStore {
spill_path: PathBuf,
spilled_blocks: HashMap<u64, (u64, SystemTime)>,
max_spilled_blocks: usize,
}
impl DiskOverflowStore {
pub fn new(spill_path: PathBuf, max_spilled_blocks: usize) -> Self {
Self {
spill_path,
spilled_blocks: HashMap::new(),
max_spilled_blocks,
}
}
pub fn add_spilled_block(&mut self, block_offset: u64, timestamp: u64) -> CheckpointResult<()> {
if self.spilled_blocks.len() >= self.max_spilled_blocks {
return Err(CheckpointError::resource(
"Disk overflow store capacity exceeded",
));
}
let spill_time = SystemTime::now();
self.spilled_blocks
.insert(block_offset, (timestamp, spill_time));
Ok(())
}
pub fn get_spilled_blocks(&self) -> &HashMap<u64, (u64, SystemTime)> {
&self.spilled_blocks
}
pub fn remove_spilled_block(&mut self, block_offset: u64) -> bool {
self.spilled_blocks.remove(&block_offset).is_some()
}
pub fn clear(&mut self) {
self.spilled_blocks.clear();
}
pub fn len(&self) -> usize {
self.spilled_blocks.len()
}
pub fn is_empty(&self) -> bool {
self.spilled_blocks.is_empty()
}
}
#[derive(Debug)]
pub struct DirtyBlockTracker {
cluster_dirty_blocks: HashMap<i64, HashSet<u64>>,
global_dirty_blocks: HashSet<u64>,
block_timestamps: HashMap<u64, u64>,
block_access_counts: HashMap<u64, u64>,
block_metadata: HashMap<u64, BlockMetadata>,
max_blocks_per_cluster: usize,
max_global_blocks: usize,
overflow_strategy: DirtyBlockOverflowStrategy,
overflow_store: Option<DiskOverflowStore>,
}
#[derive(Debug, Clone)]
pub struct BlockMetadata {
pub size: u64,
pub cluster_key: Option<i64>,
pub block_type: V2BlockType,
pub access_pattern: AccessPattern,
pub priority: u8,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum V2BlockType {
NodeRecord,
EdgeCluster,
StringTable,
FreeSpace,
Metadata,
Unknown,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum AccessPattern {
Sequential,
Random,
Burst,
Sparse,
}
#[derive(Debug, Clone)]
pub struct CheckpointProgress {
pub lsn_range: (u64, u64),
pub total_records: u64,
pub processed_records: u64,
pub flushed_blocks: u64,
pub completion_percentage: f64,
pub checkpoint_start: Instant,
}
impl V2WALCheckpointManager {
pub fn create(config: V2WALConfig, strategy: CheckpointStrategy) -> CheckpointResult<Self> {
Self::validate_configuration(&config)?;
Self::ensure_checkpoint_directory(&config)?;
let checkpoint_file = Self::create_checkpoint_file(&config)?;
let state = CheckpointManagerState::default();
let strategy_arc = Arc::new(Mutex::new(strategy));
let dirty_blocks = Arc::new(Mutex::new(DirtyBlockTracker::new(
MAX_DIRTY_BLOCKS_PER_CLUSTER,
MAX_GLOBAL_DIRTY_BLOCKS,
)));
let executor = CheckpointExecutor::new(config.clone())?;
Ok(Self {
config,
checkpoint_file: Arc::new(Mutex::new(BufWriter::new(checkpoint_file))),
state: Arc::new(Mutex::new(state)),
strategy: strategy_arc,
dirty_blocks,
executor: Arc::new(Mutex::new(executor)),
checkpoint_cv: Arc::new(Condvar::new()),
shutdown_flag: Arc::new(Mutex::new(false)),
multi_file_config: None,
})
}
pub fn with_multi_file(
config: V2WALConfig,
strategy: CheckpointStrategy,
multi_file_config: MultiFileCheckpointConfig,
) -> CheckpointResult<Self> {
multi_file_config.validate()?;
Self::validate_configuration(&config)?;
Self::ensure_checkpoint_directory(&config)?;
let checkpoint_file = Self::create_checkpoint_file(&config)?;
let state = CheckpointManagerState::default();
let strategy_arc = Arc::new(Mutex::new(strategy));
let dirty_blocks = Arc::new(Mutex::new(DirtyBlockTracker::new(
MAX_DIRTY_BLOCKS_PER_CLUSTER,
MAX_GLOBAL_DIRTY_BLOCKS,
)));
let executor = CheckpointExecutor::new(config.clone())?;
Ok(Self {
config,
checkpoint_file: Arc::new(Mutex::new(BufWriter::new(checkpoint_file))),
state: Arc::new(Mutex::new(state)),
strategy: strategy_arc,
dirty_blocks,
executor: Arc::new(Mutex::new(executor)),
checkpoint_cv: Arc::new(Condvar::new()),
shutdown_flag: Arc::new(Mutex::new(false)),
multi_file_config: Some(Arc::new(Mutex::new(multi_file_config))),
})
}
pub fn get_state(&self) -> CheckpointState {
let state = self.state.lock();
state.current_state.clone()
}
pub fn is_checkpoint_in_progress(&self) -> bool {
let state = self.state.lock();
state.in_progress
}
pub fn get_last_checkpointed_lsn(&self) -> u64 {
let state = self.state.lock();
state.checkpointed_lsn
}
pub fn get_checkpoint_statistics(&self) -> (u64, u64, u64) {
let state = self.state.lock();
(
state.completed_checkpoints,
state.failed_attempts,
state.current_operation_id,
)
}
pub fn set_overflow_strategy(&self, strategy: DirtyBlockOverflowStrategy) {
let mut dirty_blocks = self.dirty_blocks.lock();
dirty_blocks.set_overflow_strategy(strategy);
}
pub fn get_overflow_strategy(&self) -> DirtyBlockOverflowStrategy {
let dirty_blocks = self.dirty_blocks.lock();
dirty_blocks.get_overflow_strategy()
}
pub fn is_multi_file_enabled(&self) -> bool {
self.multi_file_config.is_some()
}
pub fn get_multi_file_config(&self) -> Option<MultiFileCheckpointConfig> {
self.multi_file_config.as_ref().map(|cfg| {
let guard = cfg.lock();
MultiFileCheckpointConfig {
max_segment_size: guard.max_segment_size,
base_path: guard.base_path.clone(),
max_segments: guard.max_segments,
}
})
}
pub fn mark_block_dirty(
&self,
block_offset: u64,
cluster_key: Option<i64>,
) -> CheckpointResult<()> {
let mut dirty_blocks = self.dirty_blocks.lock();
if block_offset == 0 {
return Err(CheckpointError::validation("Block offset cannot be zero"));
}
if block_offset % v2::V2_GRAPH_BLOCK_SIZE != 0 {
return Err(CheckpointError::validation(format!(
"Block offset {} is not aligned to V2 graph block size",
block_offset
)));
}
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_err(|e| CheckpointError::io(format!("Failed to get timestamp: {}", e)))?
.as_secs();
if let Some(key) = cluster_key {
dirty_blocks.mark_cluster_block_dirty(key, block_offset, timestamp)?;
dirty_blocks.update_block_access(block_offset, timestamp);
} else {
match dirty_blocks.mark_global_block_dirty(block_offset, timestamp) {
Err(e) if e.message.contains("checkpoint required") => {
drop(dirty_blocks);
let _ = self.force_checkpoint();
dirty_blocks = self.dirty_blocks.lock();
dirty_blocks.mark_global_block_dirty(block_offset, timestamp)?;
dirty_blocks.update_block_access(block_offset, timestamp);
}
Err(e) => return Err(e),
Ok(()) => {
dirty_blocks.update_block_access(block_offset, timestamp);
}
}
}
Ok(())
}
pub fn mark_blocks_dirty<I>(
&self,
block_offsets: I,
cluster_key: Option<i64>,
) -> CheckpointResult<u64>
where
I: IntoIterator<Item = u64>,
{
let mut _dirty_blocks = self.dirty_blocks.lock();
let mut marked_count = 0;
for block_offset in block_offsets {
if let Err(e) = self.mark_block_dirty(block_offset, cluster_key) {
eprintln!(
"Warning: Failed to mark block {} as dirty: {}",
block_offset, e
);
continue;
}
marked_count += 1;
}
Ok(marked_count)
}
pub fn should_checkpoint(&self) -> CheckpointResult<bool> {
let strategy = self.strategy.lock();
let state = self.state.lock();
if *self.shutdown_flag.lock() {
return Ok(false);
}
if state.in_progress {
return Ok(false);
}
if matches!(state.current_state, CheckpointState::Failed) {
return Ok(false);
}
{
let dirty_blocks = self.dirty_blocks.lock();
self.evaluate_checkpoint_strategy(&*strategy, &*dirty_blocks, &state)
}
}
pub fn get_wal_size(&self) -> CheckpointResult<u64> {
std::fs::metadata(&self.config.wal_path)
.map(|m| m.len())
.map_err(|e| CheckpointError::io(format!("Failed to get WAL size: {}", e)))
}
pub fn force_checkpoint(&self) -> CheckpointResult<CheckpointProgress> {
let start_time = Instant::now();
{
let mut state = self.state.lock();
if state.in_progress {
return Err(CheckpointError::state("Checkpoint already in progress"));
}
state.in_progress = true;
state.current_state = CheckpointState::Collecting;
state.checkpoint_start_time = Some(start_time);
state.current_operation_id += 1;
}
let result = self.execute_checkpoint(start_time, true);
{
let mut state = self.state.lock();
state.in_progress = false;
if result.is_ok() {
state.current_state = CheckpointState::Complete;
state.last_checkpoint = Some(start_time);
state.completed_checkpoints += 1;
state.checkpointed_lsn = self.get_last_checkpointed_lsn(); state.transactions_since_checkpoint = 0; state.checkpointed_wal_size = self.get_wal_size().unwrap_or(0); } else {
state.current_state = CheckpointState::Failed;
state.failed_attempts += 1;
}
}
self.checkpoint_cv.notify_all();
result
}
pub fn checkpoint(&self) -> CheckpointResult<CheckpointProgress> {
let start_time = Instant::now();
if !self.should_checkpoint()? {
return Err(CheckpointError::state(
"Checkpoint not required based on strategy",
));
}
{
let mut state = self.state.lock();
if state.in_progress {
return Err(CheckpointError::state("Checkpoint already in progress"));
}
state.in_progress = true;
state.current_state = CheckpointState::Collecting;
state.checkpoint_start_time = Some(start_time);
state.current_operation_id += 1;
}
let result = self.execute_checkpoint(start_time, false);
{
let mut state = self.state.lock();
state.in_progress = false;
if result.is_ok() {
state.current_state = CheckpointState::Complete;
state.last_checkpoint = Some(start_time);
state.completed_checkpoints += 1;
} else {
state.current_state = CheckpointState::Failed;
state.failed_attempts += 1;
}
}
self.checkpoint_cv.notify_all();
result
}
pub fn wait_for_checkpoint(&self, timeout: Duration) -> bool {
let state = self.state.lock();
let mut guard = state;
let start_time = Instant::now();
while guard.in_progress {
let remaining_timeout = timeout.saturating_sub(start_time.elapsed());
if remaining_timeout.is_zero() {
return false;
}
let result = self.checkpoint_cv.wait_for(&mut guard, remaining_timeout);
if result.timed_out() {
return false;
}
}
true
}
pub fn shutdown(&self) -> CheckpointResult<()> {
{
let mut shutdown_flag = self.shutdown_flag.lock();
*shutdown_flag = true;
}
if self.is_checkpoint_in_progress() {
if !self.wait_for_checkpoint(Duration::from_secs(30)) {
return Err(CheckpointError::timeout(
"Checkpoint did not complete during shutdown",
));
}
}
if self.should_checkpoint()? {
let _ = self.force_checkpoint();
}
{
let mut checkpoint_file = self.checkpoint_file.lock();
checkpoint_file.flush().map_err(|e| {
CheckpointError::io(format!("Failed to flush checkpoint file: {}", e))
})?;
}
Ok(())
}
fn validate_configuration(config: &V2WALConfig) -> CheckpointResult<()> {
config
.validate()
.map_err(|e| CheckpointError::configuration(e.to_string()))?;
if config.checkpoint_path.as_path().parent().is_none() {
return Err(CheckpointError::configuration(
"Checkpoint path must have a valid parent directory",
));
}
Ok(())
}
fn ensure_checkpoint_directory(config: &V2WALConfig) -> CheckpointResult<()> {
if let Some(parent) = config.checkpoint_path.parent() {
std::fs::create_dir_all(parent).map_err(|e| {
CheckpointError::io(format!("Failed to create checkpoint directory: {}", e))
})?;
}
Ok(())
}
fn create_checkpoint_file(config: &V2WALConfig) -> CheckpointResult<File> {
std::fs::OpenOptions::new()
.create(true)
.write(true)
.append(true)
.open(&config.checkpoint_path)
.map_err(|e| CheckpointError::io(format!("Failed to create checkpoint file: {}", e)))
}
fn execute_checkpoint(
&self,
_start_time: Instant,
_force: bool,
) -> CheckpointResult<CheckpointProgress> {
{
let mut state = self.state.lock();
state.current_state = CheckpointState::Processing;
}
if self.multi_file_config.is_some() {
return self.execute_multi_file_checkpoint();
}
let executor = self.executor.lock();
let manager_state = self.state.lock();
let dirty_blocks = self.dirty_blocks.lock();
let progress = executor
.execute_incremental_checkpoint(
&manager_state.current_state,
&*dirty_blocks,
0,
u64::MAX,
)
.map_err(|e| {
drop(manager_state); let mut state = self.state.lock();
state.current_state = CheckpointState::Failed;
e
})?;
Ok(progress)
}
fn execute_multi_file_checkpoint(&self) -> CheckpointResult<CheckpointProgress> {
let config_arc = self
.multi_file_config
.as_ref()
.ok_or_else(|| CheckpointError::state("Multi-file config not available"))?;
let config = config_arc.lock();
let state = self.state.lock();
let lsn_start = state.checkpointed_lsn;
drop(state);
let mut segment_writer = SegmentWriter::create(config.clone(), 0, lsn_start)?;
let executor = self.executor.lock();
let manager_state = self.state.lock();
let dirty_blocks = self.dirty_blocks.lock();
let progress = executor
.execute_incremental_checkpoint(
&manager_state.current_state,
&*dirty_blocks,
0,
u64::MAX,
)
.map_err(|e| {
drop(manager_state);
let mut state = self.state.lock();
state.current_state = CheckpointState::Failed;
e
})?;
let lsn_end = progress.lsn_range.1;
let block_count = progress.flushed_blocks;
segment_writer.finalize(lsn_end, block_count)?;
let mut manifest = CheckpointManifest::new();
manifest.timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
for segment in segment_writer.completed_segments() {
manifest.add_segment(CheckpointSegmentMeta {
index: segment.segment_index,
lsn_start: segment.lsn_range.0,
lsn_end: segment.lsn_range.1,
block_count: segment.block_count,
checksum: segment.checksum,
size: segment.size,
});
}
MultiFileRecovery::write_manifest(&manifest, &config.base_path)?;
Ok(progress)
}
fn evaluate_checkpoint_strategy(
&self,
strategy: &CheckpointStrategy,
_dirty_blocks: &DirtyBlockTracker,
state: &CheckpointManagerState,
) -> CheckpointResult<bool> {
match strategy {
CheckpointStrategy::TimeInterval(interval) => {
if let Some(last_checkpoint) = state.last_checkpoint {
Ok(last_checkpoint.elapsed() >= *interval)
} else {
Ok(true) }
}
CheckpointStrategy::TransactionCount(threshold) => {
Ok(state.transactions_since_checkpoint >= *threshold)
}
CheckpointStrategy::SizeThreshold(threshold) => {
let wal_size = std::fs::metadata(&self.config.wal_path)
.map(|m| m.len())
.unwrap_or(0);
Ok(wal_size >= *threshold)
}
CheckpointStrategy::Adaptive {
min_interval,
max_wal_size,
max_transactions,
} => {
let time_ok = state
.last_checkpoint
.map(|t| t.elapsed() >= *min_interval)
.unwrap_or(true);
let current_wal_size = std::fs::metadata(&self.config.wal_path)
.map(|m| m.len())
.unwrap_or(0);
let size_ok = current_wal_size >= *max_wal_size;
let tx_ok = state.transactions_since_checkpoint >= *max_transactions;
Ok(time_ok && (size_ok || tx_ok))
}
}
}
}
impl DirtyBlockTracker {
pub fn new(max_blocks_per_cluster: usize, max_global_blocks: usize) -> Self {
Self {
cluster_dirty_blocks: HashMap::new(),
global_dirty_blocks: HashSet::new(),
block_timestamps: HashMap::new(),
block_access_counts: HashMap::new(),
block_metadata: HashMap::new(),
max_blocks_per_cluster,
max_global_blocks,
overflow_strategy: DirtyBlockOverflowStrategy::default(),
overflow_store: None,
}
}
pub fn with_overflow_strategy(
max_blocks_per_cluster: usize,
max_global_blocks: usize,
overflow_strategy: DirtyBlockOverflowStrategy,
) -> Self {
Self {
cluster_dirty_blocks: HashMap::new(),
global_dirty_blocks: HashSet::new(),
block_timestamps: HashMap::new(),
block_access_counts: HashMap::new(),
block_metadata: HashMap::new(),
max_blocks_per_cluster,
max_global_blocks,
overflow_strategy,
overflow_store: None,
}
}
pub fn set_overflow_strategy(&mut self, strategy: DirtyBlockOverflowStrategy) {
self.overflow_strategy = strategy;
}
pub fn get_overflow_strategy(&self) -> DirtyBlockOverflowStrategy {
self.overflow_strategy
}
pub fn enable_spill_to_disk(&mut self, spill_path: PathBuf, max_spilled_blocks: usize) {
self.overflow_store = Some(DiskOverflowStore::new(spill_path, max_spilled_blocks));
}
pub fn get_overflow_store(&self) -> Option<&DiskOverflowStore> {
self.overflow_store.as_ref()
}
pub fn mark_cluster_block_dirty(
&mut self,
cluster_key: i64,
block_offset: u64,
_timestamp: u64,
) -> CheckpointResult<()> {
let cluster_blocks = self
.cluster_dirty_blocks
.entry(cluster_key)
.or_insert_with(HashSet::new);
if cluster_blocks.len() >= self.max_blocks_per_cluster {
return Err(CheckpointError::resource(format!(
"Maximum dirty blocks per cluster exceeded for cluster {}",
cluster_key
)));
}
cluster_blocks.insert(block_offset);
Ok(())
}
pub fn mark_global_block_dirty(
&mut self,
block_offset: u64,
timestamp: u64,
) -> CheckpointResult<()> {
if self.global_dirty_blocks.len() >= self.max_global_blocks {
match self.overflow_strategy {
DirtyBlockOverflowStrategy::Reject => {
return Err(CheckpointError::resource(
"Maximum global dirty blocks exceeded",
));
}
DirtyBlockOverflowStrategy::ForceCheckpoint => {
return Err(CheckpointError::checkpoint_required(
"Dirty block overflow - checkpoint required",
));
}
DirtyBlockOverflowStrategy::SpillToDisk => {
self.spill_oldest_blocks(1000)?;
}
DirtyBlockOverflowStrategy::HierarchicalPromotion => {
self.promote_to_hierarchical()?;
}
}
}
self.global_dirty_blocks.insert(block_offset);
self.block_timestamps.insert(block_offset, timestamp);
Ok(())
}
pub fn spill_oldest_blocks(&mut self, count: usize) -> CheckpointResult<()> {
if self.overflow_store.is_none() {
return Err(CheckpointError::resource(
"Spill-to-disk overflow requires overflow store to be enabled",
));
}
let overflow_store = self.overflow_store.as_mut().unwrap();
let mut oldest_blocks: Vec<(u64, u64)> = self
.global_dirty_blocks
.iter()
.filter_map(|&block| self.block_timestamps.get(&block).map(|&ts| (block, ts)))
.collect();
oldest_blocks.sort_by_key(|&(_, ts)| ts);
let to_spill = oldest_blocks.iter().take(count);
for &(block_offset, timestamp) in to_spill {
self.global_dirty_blocks.remove(&block_offset);
overflow_store.add_spilled_block(block_offset, timestamp)?;
}
Ok(())
}
pub fn promote_to_hierarchical(&mut self) -> CheckpointResult<()> {
let blocks_to_promote: Vec<(u64, i64)> = self
.global_dirty_blocks
.iter()
.filter_map(|&block| {
self.block_metadata
.get(&block)
.and_then(|meta| meta.cluster_key.map(|key| (block, key)))
})
.collect();
if blocks_to_promote.is_empty() {
return Err(CheckpointError::resource(
"No cluster-affinity blocks available for hierarchical promotion",
));
}
let promote_count = (self.max_global_blocks / 10).min(blocks_to_promote.len());
for &(block_offset, cluster_key) in blocks_to_promote.iter().take(promote_count) {
self.global_dirty_blocks.remove(&block_offset);
let cluster_blocks = self
.cluster_dirty_blocks
.entry(cluster_key)
.or_insert_with(HashSet::new);
if cluster_blocks.len() < self.max_blocks_per_cluster {
cluster_blocks.insert(block_offset);
}
}
Ok(())
}
pub fn update_block_access(&mut self, block_offset: u64, timestamp: u64) {
self.block_timestamps.insert(block_offset, timestamp);
*self.block_access_counts.entry(block_offset).or_insert(0) += 1;
}
pub fn get_dirty_blocks_for_checkpoint(&self) -> Vec<u64> {
let mut blocks = Vec::new();
for cluster_blocks in self.cluster_dirty_blocks.values() {
blocks.extend(cluster_blocks.iter().copied());
}
blocks.extend(self.global_dirty_blocks.iter().copied());
blocks.sort_unstable();
blocks
}
pub fn clear_checkpointed_blocks(&mut self, checkpointed_blocks: &[u64]) {
for &block_offset in checkpointed_blocks {
self.global_dirty_blocks.remove(&block_offset);
}
for cluster_blocks in self.cluster_dirty_blocks.values_mut() {
for &block_offset in checkpointed_blocks {
cluster_blocks.remove(&block_offset);
}
}
for &block_offset in checkpointed_blocks {
self.block_timestamps.remove(&block_offset);
self.block_access_counts.remove(&block_offset);
self.block_metadata.remove(&block_offset);
}
self.cluster_dirty_blocks
.retain(|_, blocks| !blocks.is_empty());
}
pub fn get_statistics(&self) -> (usize, usize) {
let cluster_blocks: usize = self
.cluster_dirty_blocks
.values()
.map(|set| set.len())
.sum();
let global_blocks = self.global_dirty_blocks.len();
(cluster_blocks, global_blocks)
}
pub fn global_dirty_blocks(&self) -> &HashSet<u64> {
&self.global_dirty_blocks
}
pub fn block_timestamps(&self) -> &HashMap<u64, u64> {
&self.block_timestamps
}
pub fn cluster_dirty_blocks(&self) -> &HashMap<i64, HashSet<u64>> {
&self.cluster_dirty_blocks
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::backend::native::GraphFile;
use std::time::Duration;
use tempfile::tempdir;
#[test]
fn test_checkpoint_manager_creation() -> CheckpointResult<()> {
let temp_dir = tempdir().unwrap();
let v2_graph_path = temp_dir.path().join("test.v2");
let _graph_file = GraphFile::create(&v2_graph_path).map_err(|e| {
CheckpointError::v2_integration(format!("Failed to create test graph file: {}", e))
})?;
let mut config = V2WALConfig::for_graph_file(&v2_graph_path);
config.max_wal_size = 64 * 1024 * 1024; config.buffer_size = 1024 * 1024; config.checkpoint_interval = 100;
config.enable_compression = false;
let strategy = CheckpointStrategy::TimeInterval(Duration::from_secs(60));
let manager = V2WALCheckpointManager::create(config, strategy)?;
assert_eq!(manager.get_state(), CheckpointState::Idle);
assert!(!manager.is_checkpoint_in_progress());
assert_eq!(manager.get_last_checkpointed_lsn(), 0);
Ok(())
}
#[test]
fn test_mark_block_dirty() -> CheckpointResult<()> {
let temp_dir = tempdir().unwrap();
let v2_graph_path = temp_dir.path().join("test.v2");
let _graph_file = GraphFile::create(&v2_graph_path).map_err(|e| {
CheckpointError::v2_integration(format!("Failed to create test graph file: {}", e))
})?;
let config = V2WALConfig {
wal_path: temp_dir.path().join("test.wal"),
checkpoint_path: temp_dir.path().join("test.checkpoint"),
max_wal_size: 64 * 1024 * 1024, buffer_size: 1024 * 1024, checkpoint_interval: 100,
enable_compression: false,
..Default::default()
};
let strategy = CheckpointStrategy::TimeInterval(Duration::from_secs(60));
let manager = V2WALCheckpointManager::create(config, strategy)?;
manager.mark_block_dirty(4096, Some(42))?;
manager.mark_block_dirty(8192, None)?;
let (cluster_blocks, global_blocks) = manager.dirty_blocks.lock().get_statistics();
assert_eq!(cluster_blocks, 1);
assert_eq!(global_blocks, 1);
Ok(())
}
#[test]
fn test_mark_invalid_block() {
let temp_dir = tempdir().unwrap();
let v2_graph_path = temp_dir.path().join("test.v2");
let _graph_file =
GraphFile::create(&v2_graph_path).expect("Failed to create test V2 graph file");
let config = V2WALConfig {
wal_path: temp_dir.path().join("test.wal"),
checkpoint_path: temp_dir.path().join("test.checkpoint"),
max_wal_size: 64 * 1024 * 1024, buffer_size: 1024 * 1024, checkpoint_interval: 100,
enable_compression: false,
..Default::default()
};
let strategy = CheckpointStrategy::TimeInterval(Duration::from_secs(60));
let manager = V2WALCheckpointManager::create(config, strategy).unwrap();
let result = manager.mark_block_dirty(100, Some(42));
assert!(result.is_err());
assert!(matches!(
result.unwrap_err().kind,
CheckpointErrorKind::Validation
));
}
#[test]
fn test_checkpoint_statistics() -> CheckpointResult<()> {
let temp_dir = tempdir().unwrap();
let v2_graph_path = temp_dir.path().join("test.v2");
let _graph_file = GraphFile::create(&v2_graph_path).map_err(|e| {
CheckpointError::v2_integration(format!("Failed to create test graph file: {}", e))
})?;
let config = V2WALConfig {
wal_path: temp_dir.path().join("test.wal"),
checkpoint_path: temp_dir.path().join("test.checkpoint"),
max_wal_size: 64 * 1024 * 1024, buffer_size: 1024 * 1024, checkpoint_interval: 100,
enable_compression: false,
..Default::default()
};
let strategy = CheckpointStrategy::TimeInterval(Duration::from_secs(60));
let manager = V2WALCheckpointManager::create(config, strategy)?;
let (completed, failed, operation_id) = manager.get_checkpoint_statistics();
assert_eq!(completed, 0);
assert_eq!(failed, 0);
assert_eq!(operation_id, 0);
Ok(())
}
#[test]
fn test_dirty_block_tracker_capacity_limits() {
let mut tracker = DirtyBlockTracker::new(2, 5);
tracker.mark_cluster_block_dirty(1, 4096, 100).unwrap();
tracker.mark_cluster_block_dirty(1, 8192, 101).unwrap();
let result = tracker.mark_cluster_block_dirty(1, 12288, 102);
assert!(result.is_err());
tracker.mark_global_block_dirty(16384, 103).unwrap();
tracker.mark_global_block_dirty(20480, 104).unwrap();
tracker.mark_global_block_dirty(24576, 105).unwrap();
tracker.mark_global_block_dirty(28672, 106).unwrap();
tracker.mark_global_block_dirty(32768, 107).unwrap();
let result = tracker.mark_global_block_dirty(36864, 108);
assert!(result.is_err());
}
#[test]
fn test_dirty_block_tracker_operations() {
let mut tracker = DirtyBlockTracker::new(100, 1000);
tracker.mark_cluster_block_dirty(1, 4096, 100).unwrap();
tracker.mark_global_block_dirty(8192, 101).unwrap();
tracker.mark_cluster_block_dirty(2, 12288, 102).unwrap();
let blocks = tracker.get_dirty_blocks_for_checkpoint();
assert_eq!(blocks.len(), 3);
assert!(blocks.contains(&4096));
assert!(blocks.contains(&8192));
assert!(blocks.contains(&12288));
tracker.clear_checkpointed_blocks(&[4096, 8192]);
let remaining_blocks = tracker.get_dirty_blocks_for_checkpoint();
assert_eq!(remaining_blocks.len(), 1);
assert!(remaining_blocks.contains(&12288));
}
#[test]
fn test_checkpoint_state_transitions() {
let temp_dir = tempdir().unwrap();
let v2_graph_path = temp_dir.path().join("test.v2");
let _graph_file =
GraphFile::create(&v2_graph_path).expect("Failed to create test V2 graph file");
let config = V2WALConfig {
wal_path: temp_dir.path().join("test.wal"),
checkpoint_path: temp_dir.path().join("test.checkpoint"),
max_wal_size: 64 * 1024 * 1024, buffer_size: 1024 * 1024, checkpoint_interval: 100,
enable_compression: false,
..Default::default()
};
let strategy = CheckpointStrategy::TimeInterval(Duration::from_secs(1)); let manager = V2WALCheckpointManager::create(config, strategy).unwrap();
assert_eq!(manager.get_state(), CheckpointState::Idle);
let result = manager.checkpoint();
assert!(result.is_err());
assert_eq!(manager.get_state(), CheckpointState::Failed);
}
#[test]
fn test_overflow_strategy_reject() {
let mut tracker = DirtyBlockTracker::with_overflow_strategy(
100,
100, DirtyBlockOverflowStrategy::Reject,
);
for i in 0..100 {
let result = tracker.mark_global_block_dirty(i * 4096, i as u64);
assert!(result.is_ok(), "Block {} should succeed", i);
}
let result = tracker.mark_global_block_dirty(101 * 4096, 101);
assert!(result.is_err());
assert_eq!(result.unwrap_err().kind, CheckpointErrorKind::Resource);
}
#[test]
fn test_overflow_strategy_force_checkpoint() {
let mut tracker = DirtyBlockTracker::with_overflow_strategy(
100,
100,
DirtyBlockOverflowStrategy::ForceCheckpoint,
);
for i in 0..100 {
tracker.mark_global_block_dirty(i * 4096, i as u64).unwrap();
}
let result = tracker.mark_global_block_dirty(101 * 4096, 101);
assert!(result.is_err());
assert!(result.unwrap_err().message.contains("checkpoint required"));
}
#[test]
fn test_overflow_strategy_spill_to_disk() {
let temp_dir = tempdir().unwrap();
let spill_path = temp_dir.path().join("spill");
std::fs::create_dir_all(&spill_path).unwrap();
let mut tracker = DirtyBlockTracker::with_overflow_strategy(
100,
100,
DirtyBlockOverflowStrategy::SpillToDisk,
);
tracker.enable_spill_to_disk(spill_path, 10000);
for i in 0..100 {
tracker.mark_global_block_dirty(i * 4096, i as u64).unwrap();
}
for i in 100..150 {
let result = tracker.mark_global_block_dirty(i * 4096, i as u64);
assert!(result.is_ok(), "Block {} should succeed after spill", i);
}
let (cluster_count, global_count) = tracker.get_statistics();
assert_eq!(cluster_count, 0);
assert!(
global_count < 150,
"Global count should be less than 150 after spill"
);
let overflow_store = tracker.get_overflow_store();
assert!(overflow_store.is_some());
assert!(overflow_store.unwrap().len() > 0);
}
#[test]
fn test_overflow_strategy_hierarchical() {
let mut tracker = DirtyBlockTracker::with_overflow_strategy(
100,
100,
DirtyBlockOverflowStrategy::HierarchicalPromotion,
);
for i in 0..100 {
let block_offset = i * 4096;
tracker
.mark_global_block_dirty(block_offset, i as u64)
.unwrap();
if i % 2 == 0 {
tracker.block_metadata.insert(
block_offset,
BlockMetadata {
size: 4096,
cluster_key: Some(i as i64 / 10),
block_type: V2BlockType::NodeRecord,
access_pattern: AccessPattern::Sequential,
priority: 1,
},
);
}
}
let result = tracker.mark_global_block_dirty(101 * 4096, 101);
assert!(result.is_ok(), "Hierarchical promotion should succeed");
let (cluster_count, global_count) = tracker.get_statistics();
assert!(
cluster_count > 0,
"Should have cluster blocks after promotion"
);
assert!(
global_count < 100,
"Global count should be reduced after promotion"
);
}
#[test]
fn test_50k_global_blocks_with_spill() {
let temp_dir = tempdir().unwrap();
let spill_path = temp_dir.path().join("spill");
std::fs::create_dir_all(&spill_path).unwrap();
let max_global = 50000;
let mut tracker = DirtyBlockTracker::with_overflow_strategy(
10000,
max_global,
DirtyBlockOverflowStrategy::SpillToDisk,
);
tracker.enable_spill_to_disk(spill_path, 100000);
for i in 0..60000 {
let block_offset = (i as u64) * 4096;
let result = tracker.mark_global_block_dirty(block_offset, i as u64);
assert!(
result.is_ok(),
"Block {} should succeed with spill-to-disk: {:?}",
i,
result.err()
);
}
let (cluster_count, global_count) = tracker.get_statistics();
assert_eq!(cluster_count, 0);
assert!(
global_count <= max_global + 1000,
"Should stay near capacity"
);
let overflow_store = tracker.get_overflow_store();
assert!(overflow_store.is_some());
let spilled_count = overflow_store.unwrap().len();
assert!(spilled_count > 0, "Should have spilled blocks");
}
#[test]
fn test_overflow_strategy_getters_setters() {
let mut tracker = DirtyBlockTracker::new(100, 100);
assert_eq!(
tracker.get_overflow_strategy(),
DirtyBlockOverflowStrategy::Reject
);
tracker.set_overflow_strategy(DirtyBlockOverflowStrategy::ForceCheckpoint);
assert_eq!(
tracker.get_overflow_strategy(),
DirtyBlockOverflowStrategy::ForceCheckpoint
);
tracker.set_overflow_strategy(DirtyBlockOverflowStrategy::SpillToDisk);
assert_eq!(
tracker.get_overflow_strategy(),
DirtyBlockOverflowStrategy::SpillToDisk
);
tracker.set_overflow_strategy(DirtyBlockOverflowStrategy::HierarchicalPromotion);
assert_eq!(
tracker.get_overflow_strategy(),
DirtyBlockOverflowStrategy::HierarchicalPromotion
);
}
#[test]
fn test_spill_oldest_blocks() {
let temp_dir = tempdir().unwrap();
let spill_path = temp_dir.path().join("spill");
std::fs::create_dir_all(&spill_path).unwrap();
let mut tracker = DirtyBlockTracker::with_overflow_strategy(
100,
100,
DirtyBlockOverflowStrategy::SpillToDisk,
);
tracker.enable_spill_to_disk(spill_path, 10000);
for i in 0..100 {
tracker.mark_global_block_dirty(i * 4096, i as u64).unwrap();
}
let result = tracker.spill_oldest_blocks(50);
assert!(result.is_ok());
let overflow_store = tracker.get_overflow_store();
assert_eq!(overflow_store.unwrap().len(), 50);
let (_, global_count) = tracker.get_statistics();
assert_eq!(global_count, 50);
}
#[test]
fn test_promote_to_hierarchical_with_metadata() {
let mut tracker = DirtyBlockTracker::with_overflow_strategy(
100,
100,
DirtyBlockOverflowStrategy::HierarchicalPromotion,
);
for i in 0..50 {
let block_offset = i * 4096;
tracker
.mark_global_block_dirty(block_offset, i as u64)
.unwrap();
tracker.block_metadata.insert(
block_offset,
BlockMetadata {
size: 4096,
cluster_key: Some(1), block_type: V2BlockType::NodeRecord,
access_pattern: AccessPattern::Sequential,
priority: 1,
},
);
}
let result = tracker.promote_to_hierarchical();
assert!(result.is_ok());
let (cluster_count, global_count) = tracker.get_statistics();
assert!(cluster_count > 0, "Should have cluster blocks");
assert!(global_count < 50, "Global count should be reduced");
}
#[test]
fn test_promote_to_hierarchical_without_metadata_fails() {
let mut tracker = DirtyBlockTracker::with_overflow_strategy(
100,
100,
DirtyBlockOverflowStrategy::HierarchicalPromotion,
);
for i in 0..50 {
tracker.mark_global_block_dirty(i * 4096, i as u64).unwrap();
}
let result = tracker.promote_to_hierarchical();
assert!(result.is_err());
assert!(result.unwrap_err().message.contains("cluster-affinity"));
}
#[test]
fn test_spill_to_disk_without_store_fails() {
let mut tracker = DirtyBlockTracker::with_overflow_strategy(
100,
100,
DirtyBlockOverflowStrategy::SpillToDisk,
);
for i in 0..100 {
tracker.mark_global_block_dirty(i * 4096, i as u64).unwrap();
}
let result = tracker.spill_oldest_blocks(10);
assert!(result.is_err());
assert!(result.unwrap_err().message.contains("overflow store"));
}
#[test]
fn test_checkpoint_manager_overflow_strategy() {
let temp_dir = tempdir().unwrap();
let v2_graph_path = temp_dir.path().join("test.v2");
let _graph_file =
GraphFile::create(&v2_graph_path).expect("Failed to create test V2 graph file");
let config = V2WALConfig {
wal_path: temp_dir.path().join("test.wal"),
checkpoint_path: temp_dir.path().join("test.checkpoint"),
max_wal_size: 64 * 1024 * 1024,
buffer_size: 1024 * 1024,
checkpoint_interval: 100,
enable_compression: false,
..Default::default()
};
let strategy = CheckpointStrategy::TimeInterval(Duration::from_secs(60));
let manager = V2WALCheckpointManager::create(config, strategy).unwrap();
assert_eq!(
manager.get_overflow_strategy(),
DirtyBlockOverflowStrategy::Reject
);
manager.set_overflow_strategy(DirtyBlockOverflowStrategy::ForceCheckpoint);
assert_eq!(
manager.get_overflow_strategy(),
DirtyBlockOverflowStrategy::ForceCheckpoint
);
}
#[test]
fn test_multi_file_checkpoint_manager_creation() -> CheckpointResult<()> {
let temp_dir = tempdir().unwrap();
let v2_graph_path = temp_dir.path().join("test.v2");
let _graph_file = GraphFile::create(&v2_graph_path).map_err(|e| {
CheckpointError::v2_integration(format!("Failed to create test graph file: {}", e))
})?;
let wal_path = temp_dir.path().join("test.wal");
let checkpoint_path = temp_dir.path().join("checkpoint");
let config = V2WALConfig {
wal_path,
checkpoint_path: temp_dir.path().join("test.checkpoint"),
max_wal_size: 64 * 1024 * 1024,
buffer_size: 1024 * 1024,
checkpoint_interval: 100,
enable_compression: false,
..Default::default()
};
let strategy = CheckpointStrategy::TimeInterval(Duration::from_secs(60));
let multi_file_config = MultiFileCheckpointConfig::new(checkpoint_path.clone())
.with_max_segment_size(1024 * 1024) .with_max_segments(4);
let manager = V2WALCheckpointManager::with_multi_file(config, strategy, multi_file_config)?;
assert_eq!(manager.get_state(), CheckpointState::Idle);
assert!(manager.is_multi_file_enabled());
let retrieved_config = manager.get_multi_file_config();
assert!(retrieved_config.is_some());
let cfg = retrieved_config.unwrap();
assert_eq!(cfg.max_segment_size, 1024 * 1024);
assert_eq!(cfg.max_segments, 4);
Ok(())
}
#[test]
fn test_multi_file_checkpoint_not_enabled_by_default() {
let temp_dir = tempdir().unwrap();
let v2_graph_path = temp_dir.path().join("test.v2");
let _graph_file =
GraphFile::create(&v2_graph_path).expect("Failed to create test V2 graph file");
let config = V2WALConfig {
wal_path: temp_dir.path().join("test.wal"),
checkpoint_path: temp_dir.path().join("test.checkpoint"),
max_wal_size: 64 * 1024 * 1024,
buffer_size: 1024 * 1024,
checkpoint_interval: 100,
enable_compression: false,
..Default::default()
};
let strategy = CheckpointStrategy::TimeInterval(Duration::from_secs(60));
let manager = V2WALCheckpointManager::create(config, strategy).unwrap();
assert!(!manager.is_multi_file_enabled());
assert!(manager.get_multi_file_config().is_none());
}
}