use crate::backend::native::graph_file::GraphFile;
use crate::backend::native::v2::wal::checkpoint::errors::{CheckpointError, CheckpointResult};
use std::path::{Path, PathBuf};
const V2_GRAPH_BLOCK_SIZE: u64 = 4096;
pub struct BlockFlusher {
v2_graph_path: PathBuf,
}
impl BlockFlusher {
pub fn new(v2_graph_path: PathBuf) -> Self {
Self { v2_graph_path }
}
pub fn flush_dirty_block(&self, block_offset: u64) -> CheckpointResult<()> {
if block_offset % V2_GRAPH_BLOCK_SIZE != 0 {
return Err(CheckpointError::validation(format!(
"Block offset {} not aligned to V2 block size {}",
block_offset, V2_GRAPH_BLOCK_SIZE
)));
}
let mut graph_file = GraphFile::open(&self.v2_graph_path).map_err(|e| {
CheckpointError::v2_integration(format!(
"Failed to open V2 graph file for block flushing: {}",
e
))
})?;
let file_size = graph_file.file_size().map_err(|e| {
CheckpointError::v2_integration(format!("Failed to get V2 graph file size: {}", e))
})?;
if block_offset + V2_GRAPH_BLOCK_SIZE > file_size {
return Err(CheckpointError::validation(format!(
"Block offset {} exceeds V2 graph file size {}",
block_offset, file_size
)));
}
graph_file.flush().map_err(|e| {
CheckpointError::io(format!(
"Failed to sync V2 graph file during block flush: {}",
e
))
})?;
Ok(())
}
pub fn flush_dirty_blocks(&self, block_offsets: &[u64]) -> CheckpointResult<()> {
let mut sorted_blocks = block_offsets.to_vec();
sorted_blocks.sort_unstable();
let mut graph_file = GraphFile::open(&self.v2_graph_path).map_err(|e| {
CheckpointError::v2_integration(format!(
"Failed to open V2 graph file for batch block flushing: {}",
e
))
})?;
let file_size = graph_file.file_size().map_err(|e| {
CheckpointError::v2_integration(format!("Failed to get V2 graph file size: {}", e))
})?;
for &block_offset in &sorted_blocks {
if block_offset % V2_GRAPH_BLOCK_SIZE != 0 {
return Err(CheckpointError::validation(format!(
"Block offset {} not aligned to V2 block size {}",
block_offset, V2_GRAPH_BLOCK_SIZE
)));
}
if block_offset + V2_GRAPH_BLOCK_SIZE > file_size {
return Err(CheckpointError::validation(format!(
"Block offset {} exceeds V2 graph file size {}",
block_offset, file_size
)));
}
}
graph_file.flush().map_err(|e| {
CheckpointError::io(format!(
"Failed to sync V2 graph file during batch block flush: {}",
e
))
})?;
Ok(())
}
pub fn v2_graph_path(&self) -> &Path {
&self.v2_graph_path
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::backend::native::GraphFile;
use tempfile::tempdir;
#[test]
fn test_block_flusher_creation() {
let temp_dir = tempdir().unwrap();
let v2_graph_path = temp_dir.path().join("test.v2");
let flusher = BlockFlusher::new(v2_graph_path.clone());
assert_eq!(flusher.v2_graph_path(), v2_graph_path.as_path());
}
#[test]
fn test_block_flusher_invalid_offset() {
let temp_dir = tempdir().unwrap();
let v2_graph_path = temp_dir.path().join("test.v2");
let _graph_file = GraphFile::create(&v2_graph_path).unwrap();
let flusher = BlockFlusher::new(v2_graph_path);
let result = flusher.flush_dirty_block(100); assert!(result.is_err());
if let Err(ref error) = result {
if matches!(error.kind, crate::backend::native::v2::wal::checkpoint::errors::CheckpointErrorKind::Validation) {
assert!(error.message.contains("not aligned to V2 block size"));
}
}
}
#[test]
fn test_block_flusher_offset_beyond_file() {
let temp_dir = tempdir().unwrap();
let v2_graph_path = temp_dir.path().join("test.v2");
let _graph_file = GraphFile::create(&v2_graph_path).unwrap();
let flusher = BlockFlusher::new(v2_graph_path);
let result = flusher.flush_dirty_block(100 * V2_GRAPH_BLOCK_SIZE); assert!(result.is_err());
if let Err(ref error) = result {
if matches!(error.kind, crate::backend::native::v2::wal::checkpoint::errors::CheckpointErrorKind::Validation) {
assert!(error.message.contains("exceeds V2 graph file size"));
}
}
}
fn create_test_v2_file_with_size_info(
path: &std::path::Path,
) -> CheckpointResult<(GraphFile, u64)> {
let graph_file = GraphFile::create(path)?;
let file_size = graph_file.file_size().map_err(|e| {
CheckpointError::v2_integration(format!("Failed to get file size: {}", e))
})?;
Ok((graph_file, file_size))
}
#[test]
fn test_block_flusher_with_real_v2_file() -> CheckpointResult<()> {
let temp_dir = tempdir().unwrap();
let v2_graph_path = temp_dir.path().join("test.v2");
let (_graph_file, file_size) = create_test_v2_file_with_size_info(&v2_graph_path)?;
let flusher = BlockFlusher::new(v2_graph_path.clone());
if file_size >= V2_GRAPH_BLOCK_SIZE {
let result = flusher.flush_dirty_block(0);
assert!(
result.is_ok(),
"Should successfully flush first block for file size {}",
file_size
);
} else {
let result = flusher.flush_dirty_block(0);
assert!(
result.is_err(),
"Expected failure for small file size {}",
file_size
);
}
Ok(())
}
#[test]
fn test_block_flusher_multiple_blocks() -> CheckpointResult<()> {
let temp_dir = tempdir().unwrap();
let v2_graph_path = temp_dir.path().join("test.v2");
let (_graph_file, file_size) = create_test_v2_file_with_size_info(&v2_graph_path)?;
let flusher = BlockFlusher::new(v2_graph_path);
let max_block_count = (file_size / V2_GRAPH_BLOCK_SIZE).saturating_sub(1);
let mut block_offsets = Vec::new();
for i in 0..max_block_count.min(3) {
block_offsets.push(i * V2_GRAPH_BLOCK_SIZE);
}
if block_offsets.is_empty() {
block_offsets.push(0);
}
let result = flusher.flush_dirty_blocks(&block_offsets);
if file_size >= V2_GRAPH_BLOCK_SIZE {
assert!(
result.is_ok(),
"Should successfully flush {} blocks",
block_offsets.len()
);
} else {
assert!(result.is_err(), "Expected failure for small file size");
}
Ok(())
}
}