Skip to main content

sochdb_core/
block_storage.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! File Data Block Storage (Task 12)
19//!
20//! Integrates PayloadStore as SochFS data block backend:
21//! - O(1) append-only writes (no LSM compaction overhead)
22//! - Transparent compression (LZ4/ZSTD based on content type)
23//! - Block reference tracking for garbage collection
24//!
25//! ## Block Storage Architecture
26//!
27//! ```text
28//! ┌─────────────────────────────────────────────────────────┐
29//! │                      Inode Table                        │
30//! │  inode_id=7, blocks=[(off=0, len=4096, cmp=LZ4),       │
31//! │                      (off=4096, len=2048, cmp=ZSTD)]   │
32//! └─────────────────────────────────────────────────────────┘
33//!                            │
34//!                            ▼
35//! ┌─────────────────────────────────────────────────────────┐
36//! │                    PayloadStore                         │
37//! │  Offset 0:    [LZ4 header][compressed block 1]         │
38//! │  Offset 4096: [ZSTD header][compressed block 2]        │
39//! └─────────────────────────────────────────────────────────┘
40//! ```
41//!
42//! ## Architectural Improvements (Production Tasks)
43//!
44//! ### Task 1: Fixed-Layout BlockHeader
45//!
46//! Uses deterministic 17-byte fixed layout with explicit little-endian encoding:
47//! ```text
48//! Offset  Size  Field            Type
49//! 0       4     magic            [u8; 4] = "TBLK"
50//! 4       1     compression      u8
51//! 5       4     original_size    u32 (LE)
52//! 9       4     compressed_size  u32 (LE)
53//! 13      4     checksum         u32 (LE, CRC32)
54//! Total: 17 bytes
55//! ```
56//!
57//! ### Task 5: Error Propagation
58//!
59//! All serialization methods return `Result<T, SochDBError>` instead of
60//! using `unwrap_or_default()` which silently swallows errors.
61
62use byteorder::{ByteOrder, LittleEndian};
63use parking_lot::RwLock;
64use serde::{Deserialize, Serialize};
65use std::collections::HashMap;
66use std::sync::atomic::{AtomicU64, Ordering};
67
68use crate::{Result, SochDBError};
69
70/// Default block size (4KB)
71pub const DEFAULT_BLOCK_SIZE: usize = 4096;
72
73/// Maximum block size (1MB)
74pub const MAX_BLOCK_SIZE: usize = 1024 * 1024;
75
76/// Compression type for blocks
77#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
78#[repr(u8)]
79pub enum BlockCompression {
80    /// No compression
81    None = 0,
82    /// LZ4 (fast)
83    Lz4 = 1,
84    /// ZSTD (high ratio)
85    Zstd = 2,
86}
87
88impl BlockCompression {
89    pub fn from_byte(b: u8) -> Self {
90        match b {
91            1 => BlockCompression::Lz4,
92            2 => BlockCompression::Zstd,
93            _ => BlockCompression::None,
94        }
95    }
96
97    pub fn to_byte(&self) -> u8 {
98        *self as u8
99    }
100}
101
102/// Block reference stored in inode
103#[derive(Debug, Clone, Serialize, Deserialize)]
104pub struct BlockRef {
105    /// Offset in payload store
106    pub store_offset: u64,
107    /// Compressed length
108    pub compressed_len: u32,
109    /// Original length
110    pub original_len: u32,
111    /// Compression type
112    pub compression: BlockCompression,
113    /// Checksum
114    pub checksum: u32,
115}
116
117/// Fixed size of BlockRef when serialized (21 bytes)
118/// Layout: offset(8) + compressed_len(4) + original_len(4) + compression(1) + checksum(4)
119impl BlockRef {
120    /// Fixed serialization size
121    pub const SERIALIZED_SIZE: usize = 21;
122
123    /// Serialize to fixed-layout bytes (Task 5: returns Result)
124    pub fn to_bytes(&self) -> Result<[u8; Self::SERIALIZED_SIZE]> {
125        let mut buf = [0u8; Self::SERIALIZED_SIZE];
126        LittleEndian::write_u64(&mut buf[0..8], self.store_offset);
127        LittleEndian::write_u32(&mut buf[8..12], self.compressed_len);
128        LittleEndian::write_u32(&mut buf[12..16], self.original_len);
129        buf[16] = self.compression as u8;
130        LittleEndian::write_u32(&mut buf[17..21], self.checksum);
131        Ok(buf)
132    }
133
134    /// Deserialize from fixed-layout bytes
135    pub fn from_bytes(data: &[u8]) -> Result<Self> {
136        if data.len() < Self::SERIALIZED_SIZE {
137            return Err(SochDBError::Serialization(format!(
138                "BlockRef too short: {} < {}",
139                data.len(),
140                Self::SERIALIZED_SIZE
141            )));
142        }
143
144        Ok(Self {
145            store_offset: LittleEndian::read_u64(&data[0..8]),
146            compressed_len: LittleEndian::read_u32(&data[8..12]),
147            original_len: LittleEndian::read_u32(&data[12..16]),
148            compression: BlockCompression::from_byte(data[16]),
149            checksum: LittleEndian::read_u32(&data[17..21]),
150        })
151    }
152}
153
154/// Block header in payload store (Task 1: Fixed 17-byte layout)
155///
156/// Layout (all integers little-endian):
157/// ```text
158/// Offset  Size  Field            
159/// 0       4     magic = "TBLK"   
160/// 4       1     compression      
161/// 5       4     original_size    
162/// 9       4     compressed_size  
163/// 13      4     checksum (CRC32)
164/// Total: 17 bytes
165/// ```
166#[derive(Debug, Clone)]
167pub struct BlockHeader {
168    /// Magic bytes
169    pub magic: [u8; 4],
170    /// Compression type
171    pub compression: u8,
172    /// Original size
173    pub original_size: u32,
174    /// Compressed size
175    pub compressed_size: u32,
176    /// CRC32 checksum
177    pub checksum: u32,
178}
179
180impl BlockHeader {
181    pub const MAGIC: [u8; 4] = *b"TBLK";
182    pub const SIZE: usize = 17; // Fixed layout size
183
184    /// Serialize to fixed 17-byte layout (Task 1: deterministic serialization)
185    pub fn to_bytes(&self) -> [u8; Self::SIZE] {
186        let mut buf = [0u8; Self::SIZE];
187        buf[0..4].copy_from_slice(&Self::MAGIC);
188        buf[4] = self.compression;
189        LittleEndian::write_u32(&mut buf[5..9], self.original_size);
190        LittleEndian::write_u32(&mut buf[9..13], self.compressed_size);
191        LittleEndian::write_u32(&mut buf[13..17], self.checksum);
192        buf
193    }
194
195    /// Deserialize from 17-byte buffer (Task 1: validates magic)
196    pub fn from_bytes(buf: &[u8]) -> Result<Self> {
197        if buf.len() < Self::SIZE {
198            return Err(SochDBError::Corruption(format!(
199                "BlockHeader too short: {} < {}",
200                buf.len(),
201                Self::SIZE
202            )));
203        }
204
205        if buf[0..4] != Self::MAGIC {
206            return Err(SochDBError::Corruption(format!(
207                "Invalid block magic: expected {:?}, got {:?}",
208                Self::MAGIC,
209                &buf[0..4]
210            )));
211        }
212
213        Ok(Self {
214            magic: Self::MAGIC,
215            compression: buf[4],
216            original_size: LittleEndian::read_u32(&buf[5..9]),
217            compressed_size: LittleEndian::read_u32(&buf[9..13]),
218            checksum: LittleEndian::read_u32(&buf[13..17]),
219        })
220    }
221}
222
223/// File block storage backed by append-only store
224pub struct BlockStore {
225    /// Data storage (append-only)
226    data: RwLock<Vec<u8>>,
227    /// Next write offset
228    next_offset: AtomicU64,
229    /// Block index: offset -> BlockRef
230    index: RwLock<HashMap<u64, BlockRef>>,
231    /// Reference counts for GC
232    ref_counts: RwLock<HashMap<u64, u32>>,
233}
234
235impl BlockStore {
236    /// Create a new block store
237    pub fn new() -> Self {
238        Self {
239            data: RwLock::new(Vec::new()),
240            next_offset: AtomicU64::new(0),
241            index: RwLock::new(HashMap::new()),
242            ref_counts: RwLock::new(HashMap::new()),
243        }
244    }
245
246    /// Write a block with automatic compression selection
247    ///
248    /// Returns BlockRef for the written block.
249    pub fn write_block(&self, data: &[u8]) -> Result<BlockRef> {
250        let compression = self.select_compression(data);
251        self.write_block_with_compression(data, compression)
252    }
253
254    /// Write a block with specified compression
255    pub fn write_block_with_compression(
256        &self,
257        data: &[u8],
258        compression: BlockCompression,
259    ) -> Result<BlockRef> {
260        // Compress data
261        let compressed = self.compress(data, compression)?;
262
263        // Calculate checksum
264        let checksum = crc32fast::hash(&compressed);
265
266        // Allocate offset
267        let header_size = BlockHeader::SIZE;
268        let total_size = header_size + compressed.len();
269        let offset = self
270            .next_offset
271            .fetch_add(total_size as u64, Ordering::SeqCst);
272
273        // Create header
274        let header = BlockHeader {
275            magic: BlockHeader::MAGIC,
276            compression: compression as u8,
277            original_size: data.len() as u32,
278            compressed_size: compressed.len() as u32,
279            checksum,
280        };
281
282        // Write to store
283        {
284            let mut store = self.data.write();
285            store.resize((offset + total_size as u64) as usize, 0);
286
287            // Write header (Task 1: fixed-layout serialization)
288            let header_bytes = header.to_bytes();
289            store[offset as usize..offset as usize + header_size].copy_from_slice(&header_bytes);
290
291            // Write data
292            store[offset as usize + header_size..offset as usize + total_size]
293                .copy_from_slice(&compressed);
294        }
295
296        // Create block reference
297        let block_ref = BlockRef {
298            store_offset: offset,
299            compressed_len: compressed.len() as u32,
300            original_len: data.len() as u32,
301            compression,
302            checksum,
303        };
304
305        // Update index
306        self.index.write().insert(offset, block_ref.clone());
307
308        // Initialize ref count
309        self.ref_counts.write().insert(offset, 1);
310
311        Ok(block_ref)
312    }
313
314    /// Read a block
315    pub fn read_block(&self, block_ref: &BlockRef) -> Result<Vec<u8>> {
316        let offset = block_ref.store_offset as usize;
317        let header_size = BlockHeader::SIZE;
318        let total_size = header_size + block_ref.compressed_len as usize;
319
320        // Read from store
321        let compressed = {
322            let store = self.data.read();
323            if offset + total_size > store.len() {
324                return Err(SochDBError::NotFound("Block not found".into()));
325            }
326
327            // Verify header (Task 1: use fixed-layout deserialization)
328            let header_bytes = &store[offset..offset + header_size];
329            let _header = BlockHeader::from_bytes(header_bytes)?;
330
331            // Read data
332            store[offset + header_size..offset + total_size].to_vec()
333        };
334
335        // Verify checksum
336        let checksum = crc32fast::hash(&compressed);
337        if checksum != block_ref.checksum {
338            return Err(SochDBError::Corruption("Block checksum mismatch".into()));
339        }
340
341        // Decompress
342        self.decompress(
343            &compressed,
344            block_ref.compression,
345            block_ref.original_len as usize,
346        )
347    }
348
349    /// Select compression based on data content
350    fn select_compression(&self, data: &[u8]) -> BlockCompression {
351        if data.len() < 128 {
352            return BlockCompression::None; // Too small to compress
353        }
354
355        // Detect content type
356        if is_soch_content(data) {
357            BlockCompression::Zstd // TOON is repetitive, good for ZSTD
358        } else if is_json_content(data) || is_compressible(data) {
359            BlockCompression::Lz4 // JSON and compressible content: fast compression
360        } else {
361            BlockCompression::None // Binary/random data
362        }
363    }
364
365    /// Compress data with fallback to raw on compression failure or expansion
366    ///
367    /// Returns the compressed data or original data if:
368    /// - Compression fails
369    /// - Compressed size >= original size (compression expanded the data)
370    fn compress(&self, data: &[u8], compression: BlockCompression) -> Result<Vec<u8>> {
371        match compression {
372            BlockCompression::None => Ok(data.to_vec()),
373            BlockCompression::Lz4 => {
374                match lz4::block::compress(data, None, false) {
375                    Ok(compressed) => {
376                        // Fallback to raw if compression didn't help
377                        if compressed.len() >= data.len() {
378                            Ok(data.to_vec())
379                        } else {
380                            Ok(compressed)
381                        }
382                    }
383                    Err(_) => {
384                        // Fallback to raw on compression failure
385                        Ok(data.to_vec())
386                    }
387                }
388            }
389            BlockCompression::Zstd => {
390                // Default compression level (3) for balance of speed/ratio
391                match zstd::encode_all(data, 3) {
392                    Ok(compressed) => {
393                        // Fallback to raw if compression didn't help
394                        if compressed.len() >= data.len() {
395                            Ok(data.to_vec())
396                        } else {
397                            Ok(compressed)
398                        }
399                    }
400                    Err(_) => {
401                        // Fallback to raw on compression failure
402                        Ok(data.to_vec())
403                    }
404                }
405            }
406        }
407    }
408
409    /// Decompress data with automatic format detection
410    fn decompress(
411        &self,
412        data: &[u8],
413        compression: BlockCompression,
414        original_size: usize,
415    ) -> Result<Vec<u8>> {
416        match compression {
417            BlockCompression::None => Ok(data.to_vec()),
418            BlockCompression::Lz4 => {
419                // If data matches original size, it's uncompressed (fallback case)
420                if data.len() == original_size {
421                    return Ok(data.to_vec());
422                }
423
424                lz4::block::decompress(data, Some(original_size as i32)).map_err(|e| {
425                    SochDBError::Corruption(format!("LZ4 decompression failed: {}", e))
426                })
427            }
428            BlockCompression::Zstd => {
429                // If data matches original size, it's uncompressed (fallback case)
430                if data.len() == original_size {
431                    return Ok(data.to_vec());
432                }
433
434                zstd::decode_all(data).map_err(|e| {
435                    SochDBError::Corruption(format!("ZSTD decompression failed: {}", e))
436                })
437            }
438        }
439    }
440
441    /// Increment reference count
442    pub fn add_ref(&self, offset: u64) {
443        let mut refs = self.ref_counts.write();
444        *refs.entry(offset).or_insert(0) += 1;
445    }
446
447    /// Decrement reference count
448    pub fn release_ref(&self, offset: u64) -> bool {
449        let mut refs = self.ref_counts.write();
450        if let Some(count) = refs.get_mut(&offset) {
451            *count = count.saturating_sub(1);
452            return *count == 0;
453        }
454        false
455    }
456
457    /// Get storage statistics
458    pub fn stats(&self) -> BlockStoreStats {
459        let data = self.data.read();
460        let index = self.index.read();
461
462        let mut total_original = 0u64;
463        let mut total_compressed = 0u64;
464
465        for block_ref in index.values() {
466            total_original += block_ref.original_len as u64;
467            total_compressed += block_ref.compressed_len as u64;
468        }
469
470        BlockStoreStats {
471            total_bytes: data.len() as u64,
472            block_count: index.len(),
473            total_original_bytes: total_original,
474            total_compressed_bytes: total_compressed,
475            compression_ratio: if total_compressed > 0 {
476                total_original as f64 / total_compressed as f64
477            } else {
478                1.0
479            },
480        }
481    }
482}
483
484impl Default for BlockStore {
485    fn default() -> Self {
486        Self::new()
487    }
488}
489
490/// Block store statistics
491#[derive(Debug, Clone, Default)]
492pub struct BlockStoreStats {
493    /// Total bytes in store
494    pub total_bytes: u64,
495    /// Number of blocks
496    pub block_count: usize,
497    /// Total original bytes (before compression)
498    pub total_original_bytes: u64,
499    /// Total compressed bytes
500    pub total_compressed_bytes: u64,
501    /// Compression ratio (original / compressed)
502    pub compression_ratio: f64,
503}
504
505/// Check if data looks like TOON format
506pub fn is_soch_content(data: &[u8]) -> bool {
507    // TOON typically starts with table name and brackets
508    if data.len() < 10 {
509        return false;
510    }
511
512    // Check for common TOON patterns: "name[", "name{", or starts with alphabetic
513    let s = String::from_utf8_lossy(&data[..data.len().min(100)]);
514    s.contains('[') && s.contains('{') && s.contains(':')
515}
516
517/// Check if data looks like JSON
518pub fn is_json_content(data: &[u8]) -> bool {
519    if data.is_empty() {
520        return false;
521    }
522
523    // JSON typically starts with { or [
524    let first = data[0];
525    first == b'{' || first == b'['
526}
527
528/// Check if data is likely compressible
529pub fn is_compressible(data: &[u8]) -> bool {
530    if data.len() < 64 {
531        return false;
532    }
533
534    // Count unique bytes in sample
535    let sample_size = data.len().min(256);
536    let mut seen = [false; 256];
537    let mut unique = 0;
538
539    for &byte in &data[..sample_size] {
540        if !seen[byte as usize] {
541            seen[byte as usize] = true;
542            unique += 1;
543        }
544    }
545
546    // If less than 50% unique bytes, likely compressible
547    unique < sample_size / 2
548}
549
550/// File block manager - higher level interface for file I/O
551pub struct FileBlockManager {
552    /// Block store
553    store: BlockStore,
554    /// Block size
555    block_size: usize,
556}
557
558impl FileBlockManager {
559    /// Create new file block manager
560    pub fn new(block_size: usize) -> Self {
561        Self {
562            store: BlockStore::new(),
563            block_size: block_size.min(MAX_BLOCK_SIZE),
564        }
565    }
566
567    /// Write file data, returns block references
568    pub fn write_file(&self, data: &[u8]) -> Result<Vec<BlockRef>> {
569        let mut blocks = Vec::new();
570
571        for chunk in data.chunks(self.block_size) {
572            let block_ref = self.store.write_block(chunk)?;
573            blocks.push(block_ref);
574        }
575
576        Ok(blocks)
577    }
578
579    /// Read file data from block references
580    pub fn read_file(&self, blocks: &[BlockRef]) -> Result<Vec<u8>> {
581        let mut data = Vec::new();
582
583        for block_ref in blocks {
584            let block_data = self.store.read_block(block_ref)?;
585            data.extend(block_data);
586        }
587
588        Ok(data)
589    }
590
591    /// Get underlying store stats
592    pub fn stats(&self) -> BlockStoreStats {
593        self.store.stats()
594    }
595}
596
597impl Default for FileBlockManager {
598    fn default() -> Self {
599        Self::new(DEFAULT_BLOCK_SIZE)
600    }
601}
602
603// ============================================================================
604// Durable Block Store with WAL (Task 2: Persistent Block Storage)
605// ============================================================================
606
607use std::fs::{File, OpenOptions};
608use std::io::{self, BufReader, BufWriter, Read, Seek, SeekFrom, Write};
609use std::path::{Path, PathBuf};
610
611/// Block-storage WAL record types (distinct on-disk format from transaction WAL)
612///
613/// NOTE: This enum is intentionally separate from `crate::txn::WalRecordType`
614/// because block_storage uses its own fixed 33-byte header format with
615/// incompatible discriminant values. Do NOT unify without a migration path.
616#[derive(Debug, Clone, Copy, PartialEq, Eq)]
617#[repr(u8)]
618pub enum BlockWalRecordType {
619    /// Block write record
620    BlockWrite = 1,
621    /// Checkpoint marker
622    Checkpoint = 2,
623    /// Commit marker
624    Commit = 3,
625    /// Transaction begin
626    TxnBegin = 4,
627}
628
629impl BlockWalRecordType {
630    fn from_byte(b: u8) -> Option<Self> {
631        match b {
632            1 => Some(BlockWalRecordType::BlockWrite),
633            2 => Some(BlockWalRecordType::Checkpoint),
634            3 => Some(BlockWalRecordType::Commit),
635            4 => Some(BlockWalRecordType::TxnBegin),
636            _ => None,
637        }
638    }
639}
640
641/// WAL record header (fixed 33-byte layout)
642///
643/// Layout:
644/// ```text
645/// Offset  Size  Field
646/// 0       8     lsn (Log Sequence Number)
647/// 8       8     txn_id
648/// 16      1     record_type
649/// 17      8     page_id
650/// 25      4     data_len
651/// 29      4     crc32 (checksum of header + data)
652/// Total: 33 bytes
653/// ```
654#[derive(Debug, Clone)]
655pub struct WalRecordHeader {
656    pub lsn: u64,
657    pub txn_id: u64,
658    pub record_type: BlockWalRecordType,
659    pub page_id: u64,
660    pub data_len: u32,
661    pub crc32: u32,
662}
663
664impl WalRecordHeader {
665    pub const SIZE: usize = 33;
666
667    /// Serialize to bytes
668    pub fn to_bytes(&self) -> [u8; Self::SIZE] {
669        let mut buf = [0u8; Self::SIZE];
670        LittleEndian::write_u64(&mut buf[0..8], self.lsn);
671        LittleEndian::write_u64(&mut buf[8..16], self.txn_id);
672        buf[16] = self.record_type as u8;
673        LittleEndian::write_u64(&mut buf[17..25], self.page_id);
674        LittleEndian::write_u32(&mut buf[25..29], self.data_len);
675        LittleEndian::write_u32(&mut buf[29..33], self.crc32);
676        buf
677    }
678
679    /// Deserialize from bytes
680    pub fn from_bytes(buf: &[u8]) -> Result<Self> {
681        if buf.len() < Self::SIZE {
682            return Err(SochDBError::Corruption(format!(
683                "WAL record header too short: {} < {}",
684                buf.len(),
685                Self::SIZE
686            )));
687        }
688
689        let record_type = BlockWalRecordType::from_byte(buf[16]).ok_or_else(|| {
690            SochDBError::Corruption(format!("Invalid WAL record type: {}", buf[16]))
691        })?;
692
693        Ok(Self {
694            lsn: LittleEndian::read_u64(&buf[0..8]),
695            txn_id: LittleEndian::read_u64(&buf[8..16]),
696            record_type,
697            page_id: LittleEndian::read_u64(&buf[17..25]),
698            data_len: LittleEndian::read_u32(&buf[25..29]),
699            crc32: LittleEndian::read_u32(&buf[29..33]),
700        })
701    }
702
703    /// Compute CRC32 for header + data
704    pub fn compute_crc32(&self, data: &[u8]) -> u32 {
705        let mut hasher = crc32fast::Hasher::new();
706
707        // Hash header fields (excluding crc32 itself)
708        let mut header_buf = [0u8; 29];
709        LittleEndian::write_u64(&mut header_buf[0..8], self.lsn);
710        LittleEndian::write_u64(&mut header_buf[8..16], self.txn_id);
711        header_buf[16] = self.record_type as u8;
712        LittleEndian::write_u64(&mut header_buf[17..25], self.page_id);
713        LittleEndian::write_u32(&mut header_buf[25..29], self.data_len);
714
715        hasher.update(&header_buf);
716        hasher.update(data);
717        hasher.finalize()
718    }
719}
720
721/// WAL writer for durable writes
722pub struct WalWriter {
723    /// WAL file handle
724    file: BufWriter<File>,
725    /// Next LSN to assign
726    next_lsn: u64,
727    /// Path to WAL file
728    #[allow(dead_code)]
729    path: PathBuf,
730}
731
732impl WalWriter {
733    /// Open or create WAL file
734    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
735        let path = path.as_ref().to_path_buf();
736
737        // Open file for append
738        let file = OpenOptions::new()
739            .create(true)
740            .read(true)
741            .append(true)
742            .open(&path)?;
743
744        // Get current file size for next_lsn
745        let metadata = file.metadata()?;
746        let next_lsn = metadata.len();
747
748        Ok(Self {
749            file: BufWriter::new(file),
750            next_lsn,
751            path,
752        })
753    }
754
755    /// Append a WAL record
756    pub fn append(
757        &mut self,
758        txn_id: u64,
759        record_type: BlockWalRecordType,
760        page_id: u64,
761        data: &[u8],
762    ) -> Result<u64> {
763        let lsn = self.next_lsn;
764
765        let mut header = WalRecordHeader {
766            lsn,
767            txn_id,
768            record_type,
769            page_id,
770            data_len: data.len() as u32,
771            crc32: 0, // Will be filled in
772        };
773
774        // Compute CRC32
775        header.crc32 = header.compute_crc32(data);
776
777        // Write header
778        let header_bytes = header.to_bytes();
779        self.file.write_all(&header_bytes)?;
780
781        // Write data
782        self.file.write_all(data)?;
783
784        // Update next LSN
785        self.next_lsn += WalRecordHeader::SIZE as u64 + data.len() as u64;
786
787        Ok(lsn)
788    }
789
790    /// Sync WAL to disk (fsync)
791    pub fn sync(&mut self) -> Result<()> {
792        self.file.flush()?;
793        self.file.get_ref().sync_all()?;
794        Ok(())
795    }
796
797    /// Get current LSN
798    pub fn current_lsn(&self) -> u64 {
799        self.next_lsn
800    }
801}
802
803/// WAL reader for recovery
804pub struct WalReader {
805    reader: BufReader<File>,
806    #[allow(dead_code)]
807    path: PathBuf,
808}
809
810impl WalReader {
811    /// Open WAL file for reading
812    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
813        let path = path.as_ref().to_path_buf();
814        let file = File::open(&path)?;
815
816        Ok(Self {
817            reader: BufReader::new(file),
818            path,
819        })
820    }
821
822    /// Read next WAL record
823    pub fn read_next(&mut self) -> Result<Option<(WalRecordHeader, Vec<u8>)>> {
824        // Read header
825        let mut header_buf = [0u8; WalRecordHeader::SIZE];
826        match self.reader.read_exact(&mut header_buf) {
827            Ok(()) => {}
828            Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
829            Err(e) => return Err(e.into()),
830        }
831
832        let header = WalRecordHeader::from_bytes(&header_buf)?;
833
834        // Read data
835        let mut data = vec![0u8; header.data_len as usize];
836        self.reader.read_exact(&mut data)?;
837
838        // Verify CRC32
839        let computed_crc = header.compute_crc32(&data);
840        if computed_crc != header.crc32 {
841            return Err(SochDBError::Corruption(format!(
842                "WAL CRC mismatch at LSN {}: expected {:#x}, got {:#x}",
843                header.lsn, header.crc32, computed_crc
844            )));
845        }
846
847        Ok(Some((header, data)))
848    }
849
850    /// Iterate over all records
851    pub fn iter(&mut self) -> WalIterator<'_> {
852        WalIterator { reader: self }
853    }
854}
855
856/// Iterator over WAL records
857pub struct WalIterator<'a> {
858    reader: &'a mut WalReader,
859}
860
861impl<'a> Iterator for WalIterator<'a> {
862    type Item = Result<(WalRecordHeader, Vec<u8>)>;
863
864    fn next(&mut self) -> Option<Self::Item> {
865        match self.reader.read_next() {
866            Ok(Some(record)) => Some(Ok(record)),
867            Ok(None) => None,
868            Err(e) => Some(Err(e)),
869        }
870    }
871}
872
873/// Durable block store with WAL-backed persistence
874///
875/// Provides ACID guarantees through write-ahead logging:
876/// - Writes are first logged to WAL, then synced
877/// - On crash, replays WAL to recover state
878/// - Checkpoint mechanism to truncate WAL
879pub struct DurableBlockStore {
880    /// In-memory block store (cache)
881    store: BlockStore,
882    /// WAL writer
883    wal: parking_lot::Mutex<WalWriter>,
884    /// Data file for persistent blocks
885    data_file: parking_lot::Mutex<File>,
886    /// Dirty pages not yet flushed to data file
887    dirty_pages: RwLock<HashMap<u64, Vec<u8>>>,
888    /// Checkpoint LSN (WAL can be truncated before this)
889    checkpoint_lsn: AtomicU64,
890    /// Path to data directory
891    data_dir: PathBuf,
892    /// Next page ID
893    next_page_id: AtomicU64,
894}
895
896impl DurableBlockStore {
897    /// Open or create a durable block store
898    pub fn open<P: AsRef<Path>>(data_dir: P) -> Result<Self> {
899        let data_dir = data_dir.as_ref().to_path_buf();
900
901        // Create directory if needed
902        std::fs::create_dir_all(&data_dir)?;
903
904        let wal_path = data_dir.join("wal.log");
905        let data_path = data_dir.join("blocks.dat");
906
907        // Open WAL
908        let wal = WalWriter::open(&wal_path)?;
909
910        // Open data file
911        let data_file = OpenOptions::new()
912            .create(true)
913            .read(true)
914            .write(true)
915            .truncate(false)
916            .open(&data_path)?;
917
918        let store = Self {
919            store: BlockStore::new(),
920            wal: parking_lot::Mutex::new(wal),
921            data_file: parking_lot::Mutex::new(data_file),
922            dirty_pages: RwLock::new(HashMap::new()),
923            checkpoint_lsn: AtomicU64::new(0),
924            data_dir,
925            next_page_id: AtomicU64::new(0),
926        };
927
928        Ok(store)
929    }
930
931    /// Write a block with WAL durability
932    ///
933    /// 1. Writes to WAL and fsyncs
934    /// 2. Updates in-memory store
935    /// 3. Marks page as dirty (will be flushed at checkpoint)
936    pub fn write_block(&self, txn_id: u64, data: &[u8]) -> Result<BlockRef> {
937        // Allocate page ID
938        let page_id = self.next_page_id.fetch_add(1, Ordering::SeqCst);
939
940        // Write to WAL first
941        {
942            let mut wal = self.wal.lock();
943            wal.append(txn_id, BlockWalRecordType::BlockWrite, page_id, data)?;
944            wal.sync()?; // Durability point
945        }
946
947        // Write to in-memory store
948        let block_ref = self.store.write_block(data)?;
949
950        // Mark as dirty (for checkpoint flushing)
951        self.dirty_pages.write().insert(page_id, data.to_vec());
952
953        Ok(block_ref)
954    }
955
956    /// Read a block
957    pub fn read_block(&self, block_ref: &BlockRef) -> Result<Vec<u8>> {
958        self.store.read_block(block_ref)
959    }
960
961    /// Commit a transaction
962    pub fn commit(&self, txn_id: u64) -> Result<u64> {
963        let mut wal = self.wal.lock();
964        let lsn = wal.append(txn_id, BlockWalRecordType::Commit, 0, &[])?;
965        wal.sync()?; // Durability point for commit
966        Ok(lsn)
967    }
968
969    /// Create a checkpoint
970    ///
971    /// 1. Flushes all dirty pages to data file
972    /// 2. Writes checkpoint marker to WAL
973    /// 3. Updates checkpoint LSN (WAL can be truncated before this)
974    pub fn checkpoint(&self) -> Result<u64> {
975        // Collect dirty pages
976        let dirty: Vec<(u64, Vec<u8>)> = {
977            let mut pages = self.dirty_pages.write();
978            pages.drain().collect()
979        };
980
981        // Flush to data file
982        {
983            let mut file = self.data_file.lock();
984            for (page_id, data) in &dirty {
985                let offset = *page_id * (DEFAULT_BLOCK_SIZE as u64 + BlockHeader::SIZE as u64);
986                file.seek(SeekFrom::Start(offset))?;
987                file.write_all(data)?;
988            }
989            file.sync_all()?;
990        }
991
992        // Write checkpoint marker
993        let lsn = {
994            let mut wal = self.wal.lock();
995            let lsn = wal.append(0, BlockWalRecordType::Checkpoint, 0, &[])?;
996            wal.sync()?;
997            lsn
998        };
999
1000        // Update checkpoint LSN
1001        self.checkpoint_lsn.store(lsn, Ordering::SeqCst);
1002
1003        Ok(lsn)
1004    }
1005
1006    /// Recover from WAL after crash
1007    ///
1008    /// Replays all committed transactions from the last checkpoint.
1009    pub fn recover(&mut self) -> Result<RecoveryStats> {
1010        let wal_path = self.data_dir.join("wal.log");
1011
1012        if !wal_path.exists() {
1013            return Ok(RecoveryStats::default());
1014        }
1015
1016        let mut reader = WalReader::open(&wal_path)?;
1017        let mut stats = RecoveryStats::default();
1018
1019        // Track active transactions
1020        let mut pending_txns: HashMap<u64, Vec<(u64, Vec<u8>)>> = HashMap::new();
1021        let mut committed_txns: std::collections::HashSet<u64> = std::collections::HashSet::new();
1022
1023        // Read all WAL records
1024        for record_result in reader.iter() {
1025            let (header, data) = record_result?;
1026            stats.records_read += 1;
1027
1028            match header.record_type {
1029                BlockWalRecordType::BlockWrite => {
1030                    pending_txns
1031                        .entry(header.txn_id)
1032                        .or_default()
1033                        .push((header.page_id, data));
1034                }
1035                BlockWalRecordType::Commit => {
1036                    committed_txns.insert(header.txn_id);
1037                    stats.txns_committed += 1;
1038                }
1039                BlockWalRecordType::Checkpoint => {
1040                    self.checkpoint_lsn.store(header.lsn, Ordering::SeqCst);
1041                    stats.checkpoints_found += 1;
1042                }
1043                BlockWalRecordType::TxnBegin => {
1044                    // Just track
1045                }
1046            }
1047        }
1048
1049        // Redo committed transactions
1050        for txn_id in &committed_txns {
1051            if let Some(writes) = pending_txns.remove(txn_id) {
1052                for (page_id, data) in writes {
1053                    self.store.write_block(&data)?;
1054                    self.next_page_id.fetch_max(page_id + 1, Ordering::SeqCst);
1055                    stats.blocks_recovered += 1;
1056                }
1057            }
1058        }
1059
1060        // Count aborted transactions (uncommitted)
1061        stats.txns_aborted = pending_txns.len();
1062
1063        Ok(stats)
1064    }
1065
1066    /// Get statistics
1067    pub fn stats(&self) -> DurableBlockStoreStats {
1068        let store_stats = self.store.stats();
1069        DurableBlockStoreStats {
1070            block_stats: store_stats,
1071            dirty_page_count: self.dirty_pages.read().len(),
1072            checkpoint_lsn: self.checkpoint_lsn.load(Ordering::SeqCst),
1073            wal_size: self.wal.lock().current_lsn(),
1074        }
1075    }
1076}
1077
1078/// Recovery statistics
1079#[derive(Debug, Clone, Default)]
1080pub struct RecoveryStats {
1081    /// Number of WAL records read
1082    pub records_read: usize,
1083    /// Number of committed transactions
1084    pub txns_committed: usize,
1085    /// Number of aborted transactions (not committed before crash)
1086    pub txns_aborted: usize,
1087    /// Number of blocks recovered
1088    pub blocks_recovered: usize,
1089    /// Number of checkpoints found
1090    pub checkpoints_found: usize,
1091}
1092
1093/// Durable block store statistics
1094#[derive(Debug, Clone)]
1095pub struct DurableBlockStoreStats {
1096    /// Block store stats
1097    pub block_stats: BlockStoreStats,
1098    /// Number of dirty pages (not yet flushed)
1099    pub dirty_page_count: usize,
1100    /// Last checkpoint LSN
1101    pub checkpoint_lsn: u64,
1102    /// Current WAL size (bytes)
1103    pub wal_size: u64,
1104}
1105
1106#[cfg(test)]
1107mod tests {
1108    use super::*;
1109
1110    #[test]
1111    fn test_block_store_write_read() {
1112        let store = BlockStore::new();
1113
1114        let data = b"Hello, SochFS block storage!";
1115        let block_ref = store.write_block(data).unwrap();
1116
1117        let read_data = store.read_block(&block_ref).unwrap();
1118        assert_eq!(read_data, data);
1119    }
1120
1121    #[test]
1122    fn test_compression_selection() {
1123        let store = BlockStore::new();
1124
1125        // Small data: no compression
1126        let small = b"hi";
1127        assert_eq!(store.select_compression(small), BlockCompression::None);
1128
1129        // TOON-like content (NOTE: compression stub returns None for now)
1130        let toon = b"users[5]{id,name}:\n1,Alice\n2,Bob\n3,Charlie";
1131        // In production with zstd, this would return Zstd
1132        let compression = store.select_compression(toon);
1133        assert!(compression == BlockCompression::Zstd || compression == BlockCompression::None);
1134
1135        // JSON content (NOTE: compression stub returns None for now)
1136        let json = br#"{"users": [{"id": 1, "name": "Alice"}]}"#;
1137        // In production with lz4, this would return Lz4
1138        let compression = store.select_compression(json);
1139        assert!(compression == BlockCompression::Lz4 || compression == BlockCompression::None);
1140    }
1141
1142    #[test]
1143    fn test_lz4_compression() {
1144        let store = BlockStore::new();
1145
1146        let data = "Hello, world! ".repeat(100);
1147        let block_ref = store
1148            .write_block_with_compression(data.as_bytes(), BlockCompression::Lz4)
1149            .unwrap();
1150
1151        // NOTE: With stub compression, compressed_len == original_len
1152        // In production with lz4, compressed_len < original_len
1153        // For now, just verify data integrity
1154        let read_data = store.read_block(&block_ref).unwrap();
1155        assert_eq!(read_data, data.as_bytes());
1156    }
1157
1158    #[test]
1159    fn test_zstd_compression() {
1160        let store = BlockStore::new();
1161
1162        let data = "TOON format is very repetitive ".repeat(100);
1163        let block_ref = store
1164            .write_block_with_compression(data.as_bytes(), BlockCompression::Zstd)
1165            .unwrap();
1166
1167        // NOTE: With stub compression, compressed_len == original_len
1168        // In production with zstd, compressed_len < original_len
1169        // For now, just verify data integrity
1170        let read_data = store.read_block(&block_ref).unwrap();
1171        assert_eq!(read_data, data.as_bytes());
1172    }
1173
1174    #[test]
1175    fn test_file_block_manager() {
1176        let manager = FileBlockManager::new(1024);
1177
1178        let data = "Test data ".repeat(500); // ~5KB, multiple blocks
1179        let blocks = manager.write_file(data.as_bytes()).unwrap();
1180
1181        assert!(blocks.len() > 1); // Should have multiple blocks
1182
1183        let read_data = manager.read_file(&blocks).unwrap();
1184        assert_eq!(read_data, data.as_bytes());
1185    }
1186
1187    #[test]
1188    fn test_stats() {
1189        let store = BlockStore::new();
1190
1191        // Write some compressible data
1192        let data = "Repetitive data pattern ".repeat(100);
1193        store.write_block(data.as_bytes()).unwrap();
1194
1195        let stats = store.stats();
1196        assert_eq!(stats.block_count, 1);
1197        // NOTE: With stub compression, ratio is 1.0
1198        // In production, ratio would be > 1.0 for compressible data
1199        assert!(stats.compression_ratio >= 1.0);
1200    }
1201
1202    // ========================================================================
1203    // Task 1: Fixed-Layout BlockHeader Tests
1204    // ========================================================================
1205
1206    #[test]
1207    fn test_block_header_fixed_layout() {
1208        let header = BlockHeader {
1209            magic: BlockHeader::MAGIC,
1210            compression: BlockCompression::Zstd as u8,
1211            original_size: 4096,
1212            compressed_size: 1024,
1213            checksum: 0xDEADBEEF,
1214        };
1215
1216        let bytes = header.to_bytes();
1217
1218        // Verify exact 17-byte size
1219        assert_eq!(bytes.len(), 17);
1220
1221        // Verify magic at offset 0-3
1222        assert_eq!(&bytes[0..4], b"TBLK");
1223
1224        // Verify compression at offset 4
1225        assert_eq!(bytes[4], BlockCompression::Zstd as u8);
1226
1227        // Verify original_size at offset 5-8 (little-endian)
1228        assert_eq!(LittleEndian::read_u32(&bytes[5..9]), 4096);
1229
1230        // Verify compressed_size at offset 9-12 (little-endian)
1231        assert_eq!(LittleEndian::read_u32(&bytes[9..13]), 1024);
1232
1233        // Verify checksum at offset 13-16 (little-endian)
1234        assert_eq!(LittleEndian::read_u32(&bytes[13..17]), 0xDEADBEEF);
1235    }
1236
1237    #[test]
1238    fn test_block_header_roundtrip() {
1239        let original = BlockHeader {
1240            magic: BlockHeader::MAGIC,
1241            compression: BlockCompression::Lz4 as u8,
1242            original_size: 65536,
1243            compressed_size: 32768,
1244            checksum: 0x12345678,
1245        };
1246
1247        let bytes = original.to_bytes();
1248        let recovered = BlockHeader::from_bytes(&bytes).unwrap();
1249
1250        assert_eq!(recovered.compression, original.compression);
1251        assert_eq!(recovered.original_size, original.original_size);
1252        assert_eq!(recovered.compressed_size, original.compressed_size);
1253        assert_eq!(recovered.checksum, original.checksum);
1254    }
1255
1256    #[test]
1257    fn test_block_header_invalid_magic() {
1258        let mut bytes = [0u8; 17];
1259        bytes[0..4].copy_from_slice(b"XXXX"); // Invalid magic
1260
1261        let result = BlockHeader::from_bytes(&bytes);
1262        assert!(result.is_err());
1263
1264        let err = result.unwrap_err();
1265        assert!(err.to_string().contains("Invalid block magic"));
1266    }
1267
1268    #[test]
1269    fn test_block_header_too_short() {
1270        let bytes = [0u8; 10]; // Only 10 bytes, need 17
1271
1272        let result = BlockHeader::from_bytes(&bytes);
1273        assert!(result.is_err());
1274    }
1275
1276    // ========================================================================
1277    // Task 5: BlockRef Error Propagation Tests
1278    // ========================================================================
1279
1280    #[test]
1281    fn test_block_ref_fixed_layout() {
1282        let block_ref = BlockRef {
1283            store_offset: 0x123456789ABCDEF0,
1284            compressed_len: 4096,
1285            original_len: 8192,
1286            compression: BlockCompression::Zstd,
1287            checksum: 0xCAFEBABE,
1288        };
1289
1290        let bytes = block_ref.to_bytes().unwrap();
1291
1292        // Verify exact 21-byte size
1293        assert_eq!(bytes.len(), 21);
1294
1295        // Verify offset at 0-7 (little-endian)
1296        assert_eq!(LittleEndian::read_u64(&bytes[0..8]), 0x123456789ABCDEF0);
1297
1298        // Verify compressed_len at 8-11
1299        assert_eq!(LittleEndian::read_u32(&bytes[8..12]), 4096);
1300
1301        // Verify original_len at 12-15
1302        assert_eq!(LittleEndian::read_u32(&bytes[12..16]), 8192);
1303
1304        // Verify compression at 16
1305        assert_eq!(bytes[16], BlockCompression::Zstd as u8);
1306
1307        // Verify checksum at 17-20
1308        assert_eq!(LittleEndian::read_u32(&bytes[17..21]), 0xCAFEBABE);
1309    }
1310
1311    #[test]
1312    fn test_block_ref_roundtrip() {
1313        let original = BlockRef {
1314            store_offset: u64::MAX, // Test large values
1315            compressed_len: u32::MAX,
1316            original_len: u32::MAX,
1317            compression: BlockCompression::None,
1318            checksum: u32::MAX,
1319        };
1320
1321        let bytes = original.to_bytes().unwrap();
1322        let recovered = BlockRef::from_bytes(&bytes).unwrap();
1323
1324        assert_eq!(recovered.store_offset, original.store_offset);
1325        assert_eq!(recovered.compressed_len, original.compressed_len);
1326        assert_eq!(recovered.original_len, original.original_len);
1327        assert_eq!(recovered.compression, original.compression);
1328        assert_eq!(recovered.checksum, original.checksum);
1329    }
1330
1331    #[test]
1332    fn test_block_ref_too_short() {
1333        let bytes = [0u8; 10]; // Only 10 bytes, need 21
1334
1335        let result = BlockRef::from_bytes(&bytes);
1336        assert!(result.is_err());
1337
1338        let err = result.unwrap_err();
1339        assert!(err.to_string().contains("BlockRef too short"));
1340    }
1341
1342    #[test]
1343    fn test_cross_platform_compatibility() {
1344        // Test that serialization is deterministic regardless of platform
1345        let block_ref = BlockRef {
1346            store_offset: 0x0102030405060708,
1347            compressed_len: 0x0A0B0C0D,
1348            original_len: 0x0E0F1011,
1349            compression: BlockCompression::Lz4,
1350            checksum: 0x12131415,
1351        };
1352
1353        let bytes = block_ref.to_bytes().unwrap();
1354
1355        // These exact byte values should be the same on any platform
1356        // (little-endian encoding)
1357        assert_eq!(bytes[0], 0x08); // LSB of offset
1358        assert_eq!(bytes[7], 0x01); // MSB of offset
1359        assert_eq!(bytes[8], 0x0D); // LSB of compressed_len
1360        assert_eq!(bytes[17], 0x15); // LSB of checksum
1361    }
1362
1363    // ========================================================================
1364    // Task 2: LZ4/ZSTD Production Compression Tests
1365    // ========================================================================
1366
1367    #[test]
1368    fn test_lz4_compression_roundtrip() {
1369        let store = BlockStore::new();
1370
1371        // Create compressible data (repetitive pattern)
1372        let data: Vec<u8> = (0..4096).map(|i| (i % 256) as u8).collect();
1373
1374        let block_ref = store
1375            .write_block_with_compression(&data, BlockCompression::Lz4)
1376            .unwrap();
1377        let recovered = store.read_block(&block_ref).unwrap();
1378
1379        assert_eq!(recovered, data);
1380        // Verify compression actually happened (data was compressible)
1381        assert!(block_ref.compressed_len <= block_ref.original_len);
1382    }
1383
1384    #[test]
1385    fn test_zstd_compression_roundtrip() {
1386        let store = BlockStore::new();
1387
1388        // Create highly compressible data (all zeros)
1389        let data = vec![0u8; 8192];
1390
1391        let block_ref = store
1392            .write_block_with_compression(&data, BlockCompression::Zstd)
1393            .unwrap();
1394        let recovered = store.read_block(&block_ref).unwrap();
1395
1396        assert_eq!(recovered, data);
1397        // Verify compression was very effective
1398        assert!(block_ref.compressed_len < block_ref.original_len / 2);
1399    }
1400
1401    #[test]
1402    fn test_compression_fallback_on_incompressible() {
1403        let store = BlockStore::new();
1404
1405        // Create random-looking data (incompressible)
1406        let mut data = vec![0u8; 256];
1407        for (i, byte) in data.iter_mut().enumerate().take(256) {
1408            *byte = ((i * 17 + 31) % 256) as u8; // Pseudo-random
1409        }
1410
1411        let block_ref = store
1412            .write_block_with_compression(&data, BlockCompression::Lz4)
1413            .unwrap();
1414        let recovered = store.read_block(&block_ref).unwrap();
1415
1416        assert_eq!(recovered, data);
1417    }
1418
1419    #[test]
1420    fn test_automatic_compression_selection() {
1421        let store = BlockStore::new();
1422
1423        // Test JSON content (should select LZ4)
1424        let json_data = br#"{"name": "test", "value": 123, "items": [1, 2, 3]}"#.repeat(10);
1425        let json_ref = store.write_block(&json_data).unwrap();
1426        let json_recovered = store.read_block(&json_ref).unwrap();
1427        assert_eq!(json_recovered, json_data);
1428
1429        // Test TOON content (should select ZSTD)
1430        let mut soch_data = vec![0u8; 256];
1431        soch_data[0..4].copy_from_slice(b"TOON"); // Magic header
1432        let soch_ref = store.write_block(&soch_data).unwrap();
1433        let soch_recovered = store.read_block(&soch_ref).unwrap();
1434        assert_eq!(soch_recovered, soch_data);
1435    }
1436
1437    #[test]
1438    fn test_small_data_no_compression() {
1439        let store = BlockStore::new();
1440
1441        // Data smaller than 128 bytes should not be compressed
1442        let small_data = vec![42u8; 64];
1443        let block_ref = store.write_block(&small_data).unwrap();
1444
1445        // Should be stored uncompressed
1446        assert_eq!(block_ref.compression, BlockCompression::None);
1447
1448        let recovered = store.read_block(&block_ref).unwrap();
1449        assert_eq!(recovered, small_data);
1450    }
1451
1452    #[test]
1453    fn test_compression_stats() {
1454        let store = BlockStore::new();
1455
1456        // Write compressible data
1457        let data = vec![0u8; 4096];
1458        store
1459            .write_block_with_compression(&data, BlockCompression::Zstd)
1460            .unwrap();
1461
1462        let stats = store.stats();
1463        assert_eq!(stats.block_count, 1);
1464        assert!(
1465            stats.compression_ratio > 1.0,
1466            "Compression should reduce size"
1467        );
1468        assert!(stats.total_original_bytes > stats.total_compressed_bytes);
1469    }
1470
1471    // ========================================================================
1472    // Task 2: Durable Block Store with WAL Tests
1473    // ========================================================================
1474
1475    #[test]
1476    fn test_wal_record_header_roundtrip() {
1477        let original = WalRecordHeader {
1478            lsn: 12345,
1479            txn_id: 67890,
1480            record_type: BlockWalRecordType::BlockWrite,
1481            page_id: 42,
1482            data_len: 4096,
1483            crc32: 0xDEADBEEF,
1484        };
1485
1486        let bytes = original.to_bytes();
1487        let recovered = WalRecordHeader::from_bytes(&bytes).unwrap();
1488
1489        assert_eq!(recovered.lsn, original.lsn);
1490        assert_eq!(recovered.txn_id, original.txn_id);
1491        assert_eq!(recovered.record_type, original.record_type);
1492        assert_eq!(recovered.page_id, original.page_id);
1493        assert_eq!(recovered.data_len, original.data_len);
1494        assert_eq!(recovered.crc32, original.crc32);
1495    }
1496
1497    #[test]
1498    fn test_wal_crc32() {
1499        let header = WalRecordHeader {
1500            lsn: 100,
1501            txn_id: 1,
1502            record_type: BlockWalRecordType::BlockWrite,
1503            page_id: 0,
1504            data_len: 4,
1505            crc32: 0,
1506        };
1507
1508        let data = b"test";
1509        let crc1 = header.compute_crc32(data);
1510        let crc2 = header.compute_crc32(data);
1511
1512        assert_eq!(crc1, crc2, "CRC should be deterministic");
1513
1514        // Different data should produce different CRC
1515        let different_data = b"TEST";
1516        let crc3 = header.compute_crc32(different_data);
1517        assert_ne!(crc1, crc3, "Different data should have different CRC");
1518    }
1519
1520    #[test]
1521    fn test_durable_block_store_basic() {
1522        let dir = tempfile::tempdir().unwrap();
1523
1524        let store = DurableBlockStore::open(dir.path()).unwrap();
1525
1526        // Write a block
1527        let data = b"Hello, durable block store!";
1528        let block_ref = store.write_block(1, data).unwrap();
1529
1530        // Read it back
1531        let read_data = store.read_block(&block_ref).unwrap();
1532        assert_eq!(read_data, data);
1533
1534        // Commit the transaction
1535        store.commit(1).unwrap();
1536
1537        // Check stats
1538        let stats = store.stats();
1539        assert_eq!(stats.dirty_page_count, 1);
1540    }
1541
1542    #[test]
1543    fn test_durable_block_store_checkpoint() {
1544        let dir = tempfile::tempdir().unwrap();
1545
1546        let store = DurableBlockStore::open(dir.path()).unwrap();
1547
1548        // Write some blocks
1549        store.write_block(1, b"block1").unwrap();
1550        store.write_block(1, b"block2").unwrap();
1551        store.write_block(1, b"block3").unwrap();
1552        store.commit(1).unwrap();
1553
1554        // Checkpoint should flush dirty pages
1555        let checkpoint_lsn = store.checkpoint().unwrap();
1556        assert!(checkpoint_lsn > 0);
1557
1558        // Dirty pages should be cleared
1559        let stats = store.stats();
1560        assert_eq!(stats.dirty_page_count, 0);
1561        assert_eq!(stats.checkpoint_lsn, checkpoint_lsn);
1562    }
1563
1564    #[test]
1565    fn test_durable_block_store_recovery() {
1566        let dir = tempfile::tempdir().unwrap();
1567
1568        // Phase 1: Write and commit
1569        {
1570            let store = DurableBlockStore::open(dir.path()).unwrap();
1571            store.write_block(1, b"data1").unwrap();
1572            store.write_block(1, b"data2").unwrap();
1573            store.commit(1).unwrap();
1574
1575            // Don't checkpoint - simulate crash before flush
1576        }
1577
1578        // Phase 2: Recover
1579        {
1580            let mut store = DurableBlockStore::open(dir.path()).unwrap();
1581            let stats = store.recover().unwrap();
1582
1583            // Should have recovered the committed transaction
1584            assert_eq!(stats.txns_committed, 1);
1585            assert_eq!(stats.blocks_recovered, 2);
1586            assert_eq!(stats.txns_aborted, 0);
1587        }
1588    }
1589
1590    #[test]
1591    fn test_durable_block_store_uncommitted_recovery() {
1592        let dir = tempfile::tempdir().unwrap();
1593
1594        // Phase 1: Write but don't commit (simulate crash)
1595        {
1596            let store = DurableBlockStore::open(dir.path()).unwrap();
1597            store.write_block(1, b"uncommitted_data").unwrap();
1598            // NO commit - transaction should be aborted on recovery
1599        }
1600
1601        // Phase 2: Recover
1602        {
1603            let mut store = DurableBlockStore::open(dir.path()).unwrap();
1604            let stats = store.recover().unwrap();
1605
1606            // Uncommitted transaction should be aborted
1607            assert_eq!(stats.txns_committed, 0);
1608            assert_eq!(stats.txns_aborted, 1);
1609            assert_eq!(stats.blocks_recovered, 0);
1610        }
1611    }
1612
1613    #[test]
1614    fn test_wal_writer_reader_roundtrip() {
1615        let dir = tempfile::tempdir().unwrap();
1616        let wal_path = dir.path().join("test.wal");
1617
1618        // Write records
1619        {
1620            let mut writer = WalWriter::open(&wal_path).unwrap();
1621            writer.append(1, BlockWalRecordType::TxnBegin, 0, &[]).unwrap();
1622            writer
1623                .append(1, BlockWalRecordType::BlockWrite, 0, b"data1")
1624                .unwrap();
1625            writer
1626                .append(1, BlockWalRecordType::BlockWrite, 1, b"data2")
1627                .unwrap();
1628            writer.append(1, BlockWalRecordType::Commit, 0, &[]).unwrap();
1629            writer.sync().unwrap();
1630        }
1631
1632        // Read records
1633        {
1634            let mut reader = WalReader::open(&wal_path).unwrap();
1635            let mut records = Vec::new();
1636            for record in reader.iter() {
1637                records.push(record.unwrap());
1638            }
1639
1640            assert_eq!(records.len(), 4);
1641            assert_eq!(records[0].0.record_type, BlockWalRecordType::TxnBegin);
1642            assert_eq!(records[1].0.record_type, BlockWalRecordType::BlockWrite);
1643            assert_eq!(records[1].1, b"data1");
1644            assert_eq!(records[2].0.record_type, BlockWalRecordType::BlockWrite);
1645            assert_eq!(records[2].1, b"data2");
1646            assert_eq!(records[3].0.record_type, BlockWalRecordType::Commit);
1647        }
1648    }
1649}