sqlitegraph 2.0.7

Embedded graph database with full ACID transactions, HNSW vector search, dual backend support, and comprehensive graph algorithms library
Documentation
//! Block Flusher for Checkpoint Operations
//!
//! This module provides block-level I/O operations for flushing dirty blocks
//! to V2 graph files during checkpoint operations.

use crate::backend::native::graph_file::GraphFile;
use crate::backend::native::v2::wal::checkpoint::errors::{CheckpointError, CheckpointResult};
use std::path::{Path, PathBuf};

// V2 constants for block operations
const V2_GRAPH_BLOCK_SIZE: u64 = 4096; // 4KB block size for V2

/// Block flusher for V2 graph file dirty block management
pub struct BlockFlusher {
    v2_graph_path: PathBuf,
}

impl BlockFlusher {
    /// Create new block flusher for V2 graph file
    pub fn new(v2_graph_path: PathBuf) -> Self {
        Self { v2_graph_path }
    }

    /// Flush dirty block to V2 graph file using real backend operations
    pub fn flush_dirty_block(&self, block_offset: u64) -> CheckpointResult<()> {
        // Validate block offset alignment using V2 constants
        if block_offset % V2_GRAPH_BLOCK_SIZE != 0 {
            return Err(CheckpointError::validation(format!(
                "Block offset {} not aligned to V2 block size {}",
                block_offset, V2_GRAPH_BLOCK_SIZE
            )));
        }

        // Open V2 graph file for real block flushing
        let mut graph_file = GraphFile::open(&self.v2_graph_path).map_err(|e| {
            CheckpointError::v2_integration(format!(
                "Failed to open V2 graph file for block flushing: {}",
                e
            ))
        })?;

        // Validate file can accommodate the block offset
        let file_size = graph_file.file_size().map_err(|e| {
            CheckpointError::v2_integration(format!("Failed to get V2 graph file size: {}", e))
        })?;

        if block_offset + V2_GRAPH_BLOCK_SIZE > file_size {
            return Err(CheckpointError::validation(format!(
                "Block offset {} exceeds V2 graph file size {}",
                block_offset, file_size
            )));
        }

        // Perform real block flush operation
        // In V2, block flushing ensures all cached changes are written to disk
        graph_file.flush().map_err(|e| {
            CheckpointError::io(format!(
                "Failed to sync V2 graph file during block flush: {}",
                e
            ))
        })?;

        // Note: In a full implementation, this would also:
        // 1. Check if the specific block is dirty in cache
        // 2. Write only the dirty block if needed
        // 3. Update block metadata
        // 4. Ensure write-ahead logging consistency

        Ok(())
    }

    /// Flush multiple dirty blocks efficiently with real backend operations
    pub fn flush_dirty_blocks(&self, block_offsets: &[u64]) -> CheckpointResult<()> {
        // Sort blocks for sequential I/O when possible
        let mut sorted_blocks = block_offsets.to_vec();
        sorted_blocks.sort_unstable();

        // Open V2 graph file once for efficiency
        let mut graph_file = GraphFile::open(&self.v2_graph_path).map_err(|e| {
            CheckpointError::v2_integration(format!(
                "Failed to open V2 graph file for batch block flushing: {}",
                e
            ))
        })?;

        let file_size = graph_file.file_size().map_err(|e| {
            CheckpointError::v2_integration(format!("Failed to get V2 graph file size: {}", e))
        })?;

        // Validate all block offsets before processing
        for &block_offset in &sorted_blocks {
            if block_offset % V2_GRAPH_BLOCK_SIZE != 0 {
                return Err(CheckpointError::validation(format!(
                    "Block offset {} not aligned to V2 block size {}",
                    block_offset, V2_GRAPH_BLOCK_SIZE
                )));
            }

            if block_offset + V2_GRAPH_BLOCK_SIZE > file_size {
                return Err(CheckpointError::validation(format!(
                    "Block offset {} exceeds V2 graph file size {}",
                    block_offset, file_size
                )));
            }
        }

        // Perform real batch flush operation
        graph_file.flush().map_err(|e| {
            CheckpointError::io(format!(
                "Failed to sync V2 graph file during batch block flush: {}",
                e
            ))
        })?;

        Ok(())
    }

    /// Get V2 graph file path
    pub fn v2_graph_path(&self) -> &Path {
        &self.v2_graph_path
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::backend::native::GraphFile;
    use tempfile::tempdir;

    #[test]
    fn test_block_flusher_creation() {
        let temp_dir = tempdir().unwrap();
        let v2_graph_path = temp_dir.path().join("test.v2");

        let flusher = BlockFlusher::new(v2_graph_path.clone());
        assert_eq!(flusher.v2_graph_path(), v2_graph_path.as_path());
    }

    #[test]
    fn test_block_flusher_invalid_offset() {
        let temp_dir = tempdir().unwrap();
        let v2_graph_path = temp_dir.path().join("test.v2");

        // Create a minimal V2 graph file for testing
        let _graph_file = GraphFile::create(&v2_graph_path).unwrap();

        let flusher = BlockFlusher::new(v2_graph_path);

        // Test non-aligned offset
        let result = flusher.flush_dirty_block(100); // Not aligned to 4KB
        assert!(result.is_err());

        if let Err(ref error) = result {
            if matches!(error.kind, crate::backend::native::v2::wal::checkpoint::errors::CheckpointErrorKind::Validation) {
                assert!(error.message.contains("not aligned to V2 block size"));
            }
        }
    }

    #[test]
    fn test_block_flusher_offset_beyond_file() {
        let temp_dir = tempdir().unwrap();
        let v2_graph_path = temp_dir.path().join("test.v2");

        // Create a minimal V2 graph file for testing
        let _graph_file = GraphFile::create(&v2_graph_path).unwrap();

        let flusher = BlockFlusher::new(v2_graph_path);

        // Test offset beyond file size
        let result = flusher.flush_dirty_block(100 * V2_GRAPH_BLOCK_SIZE); // Way beyond file
        assert!(result.is_err());

        if let Err(ref error) = result {
            if matches!(error.kind, crate::backend::native::v2::wal::checkpoint::errors::CheckpointErrorKind::Validation) {
                assert!(error.message.contains("exceeds V2 graph file size"));
            }
        }
    }

    /// Helper function to create a V2 graph file and return actual file size info
    fn create_test_v2_file_with_size_info(
        path: &std::path::Path,
    ) -> CheckpointResult<(GraphFile, u64)> {
        // Create base V2 graph file
        let graph_file = GraphFile::create(path)?;

        let file_size = graph_file.file_size().map_err(|e| {
            CheckpointError::v2_integration(format!("Failed to get file size: {}", e))
        })?;

        Ok((graph_file, file_size))
    }

    #[test]
    fn test_block_flusher_with_real_v2_file() -> CheckpointResult<()> {
        let temp_dir = tempdir().unwrap();
        let v2_graph_path = temp_dir.path().join("test.v2");

        // Create a real V2 graph file and check its size
        let (_graph_file, file_size) = create_test_v2_file_with_size_info(&v2_graph_path)?;

        let flusher = BlockFlusher::new(v2_graph_path.clone());

        // For minimal V2 files, we may not have any full blocks (4096 bytes)
        // Test with the smallest valid block offset, which is 0, but only if the file is large enough
        if file_size >= V2_GRAPH_BLOCK_SIZE {
            let result = flusher.flush_dirty_block(0);
            assert!(
                result.is_ok(),
                "Should successfully flush first block for file size {}",
                file_size
            );
        } else {
            // File is too small for any block operations - test this case
            let result = flusher.flush_dirty_block(0);
            assert!(
                result.is_err(),
                "Expected failure for small file size {}",
                file_size
            );
        }

        Ok(())
    }

    #[test]
    fn test_block_flusher_multiple_blocks() -> CheckpointResult<()> {
        let temp_dir = tempdir().unwrap();
        let v2_graph_path = temp_dir.path().join("test.v2");

        // Create a real V2 graph file and check its size
        let (_graph_file, file_size) = create_test_v2_file_with_size_info(&v2_graph_path)?;

        let flusher = BlockFlusher::new(v2_graph_path);

        // Calculate how many full blocks we can test with the actual file size
        let max_block_count = (file_size / V2_GRAPH_BLOCK_SIZE).saturating_sub(1); // Leave space for safety

        // Create block offsets that are within the file size
        let mut block_offsets = Vec::new();
        for i in 0..max_block_count.min(3) {
            // Test up to 3 blocks or what fits
            block_offsets.push(i * V2_GRAPH_BLOCK_SIZE);
        }

        // Ensure we have at least one block to test
        if block_offsets.is_empty() {
            // If file is too small, test with offset 0 and expect it to fail gracefully
            block_offsets.push(0);
        }

        let result = flusher.flush_dirty_blocks(&block_offsets);

        // Only assert success if we had realistic block offsets
        if file_size >= V2_GRAPH_BLOCK_SIZE {
            assert!(
                result.is_ok(),
                "Should successfully flush {} blocks",
                block_offsets.len()
            );
        } else {
            // File too small for block operations - this is expected for minimal test files
            assert!(result.is_err(), "Expected failure for small file size");
        }

        Ok(())
    }
}