lcpfs 2026.1.102

LCP File System - A ZFS-inspired copy-on-write filesystem for Rust
// Copyright 2025 LunaOS Contributors
// SPDX-License-Identifier: Apache-2.0
//
// IO Pipeline
// Compression, dedup, encryption, and disk operations.

use crate::BLOCK_DEVICES;
use crate::LcpfsCrypto;
use crate::cache::arc::ARC;
use crate::compress::compress::Compress;
use crate::dedup::dedup::DDT;
use crate::fscore::impl_::LcpfsController;
use crate::fscore::structs::{Blkptr, Dva};
use crate::integrity::checksum::Checksum;
use crate::util::alloc::METASLAB;
use crate::{FsError, FsResult};
use alloc::vec;
use alloc::vec::Vec;

/// I/O pipeline for compression, checksumming, and encryption
pub struct Pipeline;

impl Pipeline {
    /// Check if RAID-Z1 mode is available (3+ block devices)
    fn is_raid_mode() -> bool {
        BLOCK_DEVICES.lock().len() >= 3
    }

    /// Write raw data to disk at the given byte offset (single-disk mode)
    fn write_to_disk_single(offset: u64, data: &[u8]) -> Result<(), &'static str> {
        let mut devices = BLOCK_DEVICES.lock();
        let dev = devices.get_mut(0).ok_or("No block device")?;

        // Write data sector by sector (512 bytes each)
        let start_sector = (offset / 512) as usize;
        let mut remaining = data;
        let mut sector_idx = start_sector;

        while !remaining.is_empty() {
            let chunk_size = core::cmp::min(512, remaining.len());
            let mut sector_buf = [0u8; 512];
            sector_buf[..chunk_size].copy_from_slice(&remaining[..chunk_size]);

            dev.write_block(sector_idx, &sector_buf)?;

            remaining = &remaining[chunk_size..];
            sector_idx += 1;
        }

        Ok(())
    }

    /// Write data using RAID-Z1 striping (3 disks: D0, D1, Parity)
    fn write_to_disk_raid(offset: u64, data: &[u8]) -> Result<(), &'static str> {
        let controller = LcpfsController::new(0, 1, 2);
        controller.write_data(offset, data)
    }

    /// Read raw data from disk at the given byte offset (single-disk mode)
    fn read_from_disk_single(offset: u64, size: usize) -> Result<Vec<u8>, &'static str> {
        let mut devices = BLOCK_DEVICES.lock();
        let dev = devices.get_mut(0).ok_or("No block device")?;

        // Read data sector by sector
        let start_sector = (offset / 512) as usize;
        let sectors_needed = size.div_ceil(512);
        let mut result = vec![0u8; sectors_needed * 512];

        for i in 0..sectors_needed {
            let mut sector_buf = [0u8; 512];
            dev.read_block(start_sector + i, &mut sector_buf)?;
            result[i * 512..(i + 1) * 512].copy_from_slice(&sector_buf);
        }

        // Trim to requested size
        result.truncate(size);
        Ok(result)
    }

    /// Read data using RAID-Z1 (with self-healing on checksum failure)
    fn read_from_disk_raid(offset: u64, size: usize) -> Result<Vec<u8>, &'static str> {
        let controller = LcpfsController::new(0, 1, 2);
        let stripe_size = 1024; // 2x512 sectors per stripe
        let start_stripe = (offset / stripe_size as u64) as usize;
        let stripes_needed = size.div_ceil(stripe_size);

        let mut result = Vec::with_capacity(stripes_needed * stripe_size);

        for i in 0..stripes_needed {
            let stripe_data = controller.read_stripe(start_stripe + i)?;
            result.extend_from_slice(&stripe_data);
        }

        result.truncate(size);
        Ok(result)
    }

    /// The Write Path: Data -> Compress -> Dedup -> Encrypt -> Allocate -> Write -> Cache
    /// Returns (DVA, compression_type) where compression_type: 0=none, 1=lz4
    pub fn write_block(data: &[u8], key: &[u8; 32], txg: u64) -> FsResult<(Dva, u8)> {
        // 0. Calculate checksum of original data BEFORE any processing
        let _original_checksum = Checksum::calculate(data);

        // 1. COMPRESSION (optional, skip if data is small or incompressible)
        let (compressed_data, compression_type) = if data.len() >= 64 {
            let compressed = Compress::compress(data).unwrap_or_else(|| data.to_vec());
            // Check if compression actually saved space
            if compressed.len() < data.len() {
                (compressed, 1u8) // 1 = LZ4
            } else {
                (data.to_vec(), 0u8) // 0 = No compression
            }
        } else {
            (data.to_vec(), 0u8) // 0 = No compression
        };

        // 2. DEDUPLICATION CHECK
        {
            let mut ddt = DDT.lock();
            if let Some(existing_dva) = ddt.dedup(data) {
                // Block already exists - just increment refcount
                // Return with compression type (dedupe might have different compression)
                return Ok((existing_dva, compression_type));
            }
        }

        // 3. ENCRYPTION
        let (encrypted_data, _nonce) = LcpfsCrypto::encrypt_block(key, &compressed_data, txg)
            .map_err(|_| FsError::EncryptionFailed)?;

        // 4. ALLOCATION
        let dva: Dva = {
            let mut metaslab = METASLAB.lock();
            metaslab.allocate(encrypted_data.len() as u64)?
        };

        // 5. PHYSICAL DISK WRITE (RAID-Z1 or single-disk)
        let write_result = if Self::is_raid_mode() {
            Self::write_to_disk_raid(dva.offset, &encrypted_data)
        } else {
            Self::write_to_disk_single(dva.offset, &encrypted_data)
        };

        if let Err(e) = write_result {
            // Free the allocation on failure
            METASLAB.lock().free(dva, encrypted_data.len() as u64);
            return Err(FsError::IoError { vdev: 0, reason: e });
        }

        // 6. REGISTER IN DEDUP TABLE
        {
            let mut ddt = DDT.lock();
            ddt.register(data, dva);
        }

        // 7. CACHE IN ARC
        {
            let mut arc = ARC.lock();
            arc.cache(dva, data.to_vec());
        }

        Ok((dva, compression_type))
    }

    /// The Read Path: Cache Check -> Physical Read -> Decrypt -> Decompress -> Verify
    ///
    /// This version derives the nonce from the block pointer automatically.
    /// Use this for all new code.
    pub fn read_block_auto_nonce(bp: &Blkptr, key: &[u8; 32]) -> FsResult<Vec<u8>> {
        let nonce = LcpfsCrypto::derive_nonce_from_bp(bp);
        Self::read_block(bp, key, &nonce)
    }

    /// The Read Path: Cache Check -> Physical Read -> Decrypt -> Decompress -> Verify
    ///
    /// # Arguments
    ///
    /// * `bp` - Block pointer with DVA, checksum, and compression info
    /// * `key` - 32-byte encryption key
    /// * `nonce` - 12-byte nonce (use `derive_nonce_from_bp` or `read_block_auto_nonce`)
    pub fn read_block(bp: &Blkptr, key: &[u8; 32], nonce: &[u8; 12]) -> FsResult<Vec<u8>> {
        let dva = bp.dva[0];

        // 1. ARC Check (Fast Path)
        {
            let mut arc = ARC.lock();
            if let Some(data) = arc.read(&dva) {
                return Ok(data);
            }
        }

        // 2. Physical Read (RAID-Z1 or single-disk)
        // Use padding field as logical_size, or default to 4KB
        let read_size = if bp.padding > 0 && bp.padding <= 131072 {
            bp.padding as usize
        } else {
            4096 // Default to 4KB (common block size)
        };

        let raw_data = if Self::is_raid_mode() {
            Self::read_from_disk_raid(dva.offset, read_size)
        } else {
            Self::read_from_disk_single(dva.offset, read_size)
        }
        .map_err(|e| FsError::IoError { vdev: 0, reason: e })?;

        // 3. Decryption
        let decrypted = LcpfsCrypto::decrypt_block(key, &raw_data, nonce)
            .map_err(|_| FsError::DecryptionFailed)?;

        // 4. Decompression (if block was compressed)
        let plaintext = if bp.flags_compression & 0xFF != 0 {
            // Compression flag is set
            Compress::decompress(&decrypted).unwrap_or(decrypted)
        } else {
            decrypted
        };

        // 5. Checksum Verification - CRITICAL: fail on mismatch, don't return corrupt data
        let computed_checksum = Checksum::calculate(&plaintext);
        if !computed_checksum.matches(&bp.checksum) {
            crate::lcpfs_println!("[ PIPELINE ] Checksum mismatch at DVA {:x}", dva.offset);

            // Try RAID-Z1 self-healing if available
            if Self::is_raid_mode() {
                crate::lcpfs_println!("[ PIPELINE ] Attempting RAID-Z1 self-healing...");
                let controller = LcpfsController::new(0, 1, 2);
                let stripe_idx = (dva.offset / 1024) as usize;
                if let Ok(healed) = controller.read_stripe_with_healing(stripe_idx) {
                    if let Ok(healed_decrypted) = LcpfsCrypto::decrypt_block(key, &healed, nonce) {
                        let healed_checksum = Checksum::calculate(&healed_decrypted);
                        if healed_checksum.matches(&bp.checksum) {
                            crate::lcpfs_println!("[ PIPELINE ] Self-healing successful!");
                            let mut arc = ARC.lock();
                            arc.cache(dva, healed_decrypted.clone());
                            return Ok(healed_decrypted);
                        }
                    }
                }
                crate::lcpfs_println!("[ PIPELINE ] Self-healing failed");
            }

            // CRITICAL FIX: Return error instead of corrupt data
            return Err(FsError::ChecksumMismatch {
                expected: bp.checksum,
                actual: computed_checksum.to_u64_array(),
            });
        }

        // 6. Update ARC
        {
            let mut arc = ARC.lock();
            arc.cache(dva, plaintext.clone());
        }

        Ok(plaintext)
    }

    /// Write a block and return a complete block pointer
    pub fn write_block_full(data: &[u8], key: &[u8; 32], txg: u64) -> FsResult<Blkptr> {
        let checksum = Checksum::calculate(data);
        let (dva, compression_type) = Self::write_block(data, key, txg)?;

        let mut bp = Blkptr::zero();
        bp.dva[0] = dva;
        bp.birth_txg = txg;
        bp.checksum = checksum.value;
        bp.fill_count = 1;
        bp.padding = data.len() as u64; // Store logical size for reads
        bp.flags_compression = compression_type as u64; // Set compression flag

        Ok(bp)
    }

    /// Read multiple blocks in parallel (prefetch)
    pub fn prefetch_blocks(bps: &[Blkptr], key: &[u8; 32], nonce: &[u8; 12]) {
        for bp in bps {
            if !bp.is_hole() {
                // Trigger read to populate ARC
                let _ = Self::read_block(bp, key, nonce);
            }
        }
    }

    /// Sync dirty data to disk.
    ///
    /// This triggers a transaction group sync to ensure all dirty metadata
    /// (dnodes, block pointers) are persisted to disk. Data blocks are written
    /// immediately by `write_block()`, but metadata updates are batched in
    /// transaction groups for efficiency.
    ///
    /// # Returns
    ///
    /// The transaction group number that was synced, or an error if sync failed.
    pub fn sync_dirty() -> FsResult<u64> {
        use crate::storage::zpl::ZPL;

        let arc = ARC.lock();
        let cached_blocks = arc.t1.len() + arc.t2.len();
        drop(arc);

        crate::lcpfs_println!(
            "[ PIPELINE ] Syncing (ARC has {} cached blocks)",
            cached_blocks
        );

        // Trigger ZPL txg_sync which writes all dirty metadata to disk
        let mut zpl = ZPL.lock();
        let txg = zpl.txg_sync(0)?;

        crate::lcpfs_println!("[ PIPELINE ] Sync complete, TXG {}", txg);

        Ok(txg)
    }
}