Skip to main content

sochdb_core/
block_storage.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! File Data Block Storage (Task 12)
19//!
20//! Integrates PayloadStore as SochFS data block backend:
21//! - O(1) append-only writes (no LSM compaction overhead)
22//! - Transparent compression (LZ4/ZSTD based on content type)
23//! - Block reference tracking for garbage collection
24//!
25//! ## Block Storage Architecture
26//!
27//! ```text
28//! ┌─────────────────────────────────────────────────────────┐
29//! │                      Inode Table                        │
30//! │  inode_id=7, blocks=[(off=0, len=4096, cmp=LZ4),       │
31//! │                      (off=4096, len=2048, cmp=ZSTD)]   │
32//! └─────────────────────────────────────────────────────────┘
33//!                            │
34//!                            ▼
35//! ┌─────────────────────────────────────────────────────────┐
36//! │                    PayloadStore                         │
37//! │  Offset 0:    [LZ4 header][compressed block 1]         │
38//! │  Offset 4096: [ZSTD header][compressed block 2]        │
39//! └─────────────────────────────────────────────────────────┘
40//! ```
41//!
42//! ## Architectural Improvements (Production Tasks)
43//!
44//! ### Task 1: Fixed-Layout BlockHeader
45//!
46//! Uses deterministic 17-byte fixed layout with explicit little-endian encoding:
47//! ```text
48//! Offset  Size  Field            Type
49//! 0       4     magic            [u8; 4] = "TBLK"
50//! 4       1     compression      u8
51//! 5       4     original_size    u32 (LE)
52//! 9       4     compressed_size  u32 (LE)
53//! 13      4     checksum         u32 (LE, CRC32)
54//! Total: 17 bytes
55//! ```
56//!
57//! ### Task 5: Error Propagation
58//!
59//! All serialization methods return `Result<T, SochDBError>` instead of
60//! using `unwrap_or_default()` which silently swallows errors.
61
62use byteorder::{ByteOrder, LittleEndian};
63use parking_lot::RwLock;
64use serde::{Deserialize, Serialize};
65use std::collections::HashMap;
66use std::sync::atomic::{AtomicU64, Ordering};
67
68use crate::{Result, SochDBError};
69
70/// Default block size (4KB)
71pub const DEFAULT_BLOCK_SIZE: usize = 4096;
72
73/// Maximum block size (1MB)
74pub const MAX_BLOCK_SIZE: usize = 1024 * 1024;
75
76/// Compression type for blocks
77#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
78#[repr(u8)]
79pub enum BlockCompression {
80    /// No compression
81    None = 0,
82    /// LZ4 (fast)
83    Lz4 = 1,
84    /// ZSTD (high ratio)
85    Zstd = 2,
86}
87
88impl BlockCompression {
89    pub fn from_byte(b: u8) -> Self {
90        match b {
91            1 => BlockCompression::Lz4,
92            2 => BlockCompression::Zstd,
93            _ => BlockCompression::None,
94        }
95    }
96
97    pub fn to_byte(&self) -> u8 {
98        *self as u8
99    }
100}
101
102/// Block reference stored in inode
103#[derive(Debug, Clone, Serialize, Deserialize)]
104pub struct BlockRef {
105    /// Offset in payload store
106    pub store_offset: u64,
107    /// Compressed length
108    pub compressed_len: u32,
109    /// Original length
110    pub original_len: u32,
111    /// Compression type
112    pub compression: BlockCompression,
113    /// Checksum
114    pub checksum: u32,
115}
116
117/// Fixed size of BlockRef when serialized (21 bytes)
118/// Layout: offset(8) + compressed_len(4) + original_len(4) + compression(1) + checksum(4)
119impl BlockRef {
120    /// Fixed serialization size
121    pub const SERIALIZED_SIZE: usize = 21;
122
123    /// Serialize to fixed-layout bytes (Task 5: returns Result)
124    pub fn to_bytes(&self) -> Result<[u8; Self::SERIALIZED_SIZE]> {
125        let mut buf = [0u8; Self::SERIALIZED_SIZE];
126        LittleEndian::write_u64(&mut buf[0..8], self.store_offset);
127        LittleEndian::write_u32(&mut buf[8..12], self.compressed_len);
128        LittleEndian::write_u32(&mut buf[12..16], self.original_len);
129        buf[16] = self.compression as u8;
130        LittleEndian::write_u32(&mut buf[17..21], self.checksum);
131        Ok(buf)
132    }
133
134    /// Deserialize from fixed-layout bytes
135    pub fn from_bytes(data: &[u8]) -> Result<Self> {
136        if data.len() < Self::SERIALIZED_SIZE {
137            return Err(SochDBError::Serialization(format!(
138                "BlockRef too short: {} < {}",
139                data.len(),
140                Self::SERIALIZED_SIZE
141            )));
142        }
143
144        Ok(Self {
145            store_offset: LittleEndian::read_u64(&data[0..8]),
146            compressed_len: LittleEndian::read_u32(&data[8..12]),
147            original_len: LittleEndian::read_u32(&data[12..16]),
148            compression: BlockCompression::from_byte(data[16]),
149            checksum: LittleEndian::read_u32(&data[17..21]),
150        })
151    }
152}
153
154/// Block header in payload store (Task 1: Fixed 17-byte layout)
155///
156/// Layout (all integers little-endian):
157/// ```text
158/// Offset  Size  Field            
159/// 0       4     magic = "TBLK"   
160/// 4       1     compression      
161/// 5       4     original_size    
162/// 9       4     compressed_size  
163/// 13      4     checksum (CRC32)
164/// Total: 17 bytes
165/// ```
166#[derive(Debug, Clone)]
167pub struct BlockHeader {
168    /// Magic bytes
169    pub magic: [u8; 4],
170    /// Compression type
171    pub compression: u8,
172    /// Original size
173    pub original_size: u32,
174    /// Compressed size
175    pub compressed_size: u32,
176    /// CRC32 checksum
177    pub checksum: u32,
178}
179
180impl BlockHeader {
181    pub const MAGIC: [u8; 4] = *b"TBLK";
182    pub const SIZE: usize = 17; // Fixed layout size
183
184    /// Serialize to fixed 17-byte layout (Task 1: deterministic serialization)
185    pub fn to_bytes(&self) -> [u8; Self::SIZE] {
186        let mut buf = [0u8; Self::SIZE];
187        buf[0..4].copy_from_slice(&Self::MAGIC);
188        buf[4] = self.compression;
189        LittleEndian::write_u32(&mut buf[5..9], self.original_size);
190        LittleEndian::write_u32(&mut buf[9..13], self.compressed_size);
191        LittleEndian::write_u32(&mut buf[13..17], self.checksum);
192        buf
193    }
194
195    /// Deserialize from 17-byte buffer (Task 1: validates magic)
196    pub fn from_bytes(buf: &[u8]) -> Result<Self> {
197        if buf.len() < Self::SIZE {
198            return Err(SochDBError::Corruption(format!(
199                "BlockHeader too short: {} < {}",
200                buf.len(),
201                Self::SIZE
202            )));
203        }
204
205        if buf[0..4] != Self::MAGIC {
206            return Err(SochDBError::Corruption(format!(
207                "Invalid block magic: expected {:?}, got {:?}",
208                Self::MAGIC,
209                &buf[0..4]
210            )));
211        }
212
213        Ok(Self {
214            magic: Self::MAGIC,
215            compression: buf[4],
216            original_size: LittleEndian::read_u32(&buf[5..9]),
217            compressed_size: LittleEndian::read_u32(&buf[9..13]),
218            checksum: LittleEndian::read_u32(&buf[13..17]),
219        })
220    }
221}
222
223/// File block storage backed by append-only store
224pub struct BlockStore {
225    /// Data storage (append-only)
226    data: RwLock<Vec<u8>>,
227    /// Next write offset
228    next_offset: AtomicU64,
229    /// Block index: offset -> BlockRef
230    index: RwLock<HashMap<u64, BlockRef>>,
231    /// Reference counts for GC
232    ref_counts: RwLock<HashMap<u64, u32>>,
233}
234
235impl BlockStore {
236    /// Create a new block store
237    pub fn new() -> Self {
238        Self {
239            data: RwLock::new(Vec::new()),
240            next_offset: AtomicU64::new(0),
241            index: RwLock::new(HashMap::new()),
242            ref_counts: RwLock::new(HashMap::new()),
243        }
244    }
245
246    /// Write a block with automatic compression selection
247    ///
248    /// Returns BlockRef for the written block.
249    pub fn write_block(&self, data: &[u8]) -> Result<BlockRef> {
250        let compression = self.select_compression(data);
251        self.write_block_with_compression(data, compression)
252    }
253
254    /// Write a block with specified compression
255    pub fn write_block_with_compression(
256        &self,
257        data: &[u8],
258        compression: BlockCompression,
259    ) -> Result<BlockRef> {
260        // Compress data
261        let compressed = self.compress(data, compression)?;
262
263        // Calculate checksum
264        let checksum = crc32fast::hash(&compressed);
265
266        // Allocate offset
267        let header_size = BlockHeader::SIZE;
268        let total_size = header_size + compressed.len();
269        let offset = self
270            .next_offset
271            .fetch_add(total_size as u64, Ordering::SeqCst);
272
273        // Create header
274        let header = BlockHeader {
275            magic: BlockHeader::MAGIC,
276            compression: compression as u8,
277            original_size: data.len() as u32,
278            compressed_size: compressed.len() as u32,
279            checksum,
280        };
281
282        // Write to store
283        {
284            let mut store = self.data.write();
285            store.resize((offset + total_size as u64) as usize, 0);
286
287            // Write header (Task 1: fixed-layout serialization)
288            let header_bytes = header.to_bytes();
289            store[offset as usize..offset as usize + header_size].copy_from_slice(&header_bytes);
290
291            // Write data
292            store[offset as usize + header_size..offset as usize + total_size]
293                .copy_from_slice(&compressed);
294        }
295
296        // Create block reference
297        let block_ref = BlockRef {
298            store_offset: offset,
299            compressed_len: compressed.len() as u32,
300            original_len: data.len() as u32,
301            compression,
302            checksum,
303        };
304
305        // Update index
306        self.index.write().insert(offset, block_ref.clone());
307
308        // Initialize ref count
309        self.ref_counts.write().insert(offset, 1);
310
311        Ok(block_ref)
312    }
313
314    /// Read a block
315    pub fn read_block(&self, block_ref: &BlockRef) -> Result<Vec<u8>> {
316        let offset = block_ref.store_offset as usize;
317        let header_size = BlockHeader::SIZE;
318        let total_size = header_size + block_ref.compressed_len as usize;
319
320        // Read from store
321        let compressed = {
322            let store = self.data.read();
323            if offset + total_size > store.len() {
324                return Err(SochDBError::NotFound("Block not found".into()));
325            }
326
327            // Verify header (Task 1: use fixed-layout deserialization)
328            let header_bytes = &store[offset..offset + header_size];
329            let _header = BlockHeader::from_bytes(header_bytes)?;
330
331            // Read data
332            store[offset + header_size..offset + total_size].to_vec()
333        };
334
335        // Verify checksum
336        let checksum = crc32fast::hash(&compressed);
337        if checksum != block_ref.checksum {
338            return Err(SochDBError::Corruption("Block checksum mismatch".into()));
339        }
340
341        // Decompress
342        self.decompress(
343            &compressed,
344            block_ref.compression,
345            block_ref.original_len as usize,
346        )
347    }
348
349    /// Select compression based on data content
350    fn select_compression(&self, data: &[u8]) -> BlockCompression {
351        if data.len() < 128 {
352            return BlockCompression::None; // Too small to compress
353        }
354
355        // Detect content type
356        if is_soch_content(data) {
357            BlockCompression::Zstd // TOON is repetitive, good for ZSTD
358        } else if is_json_content(data) || is_compressible(data) {
359            BlockCompression::Lz4 // JSON and compressible content: fast compression
360        } else {
361            BlockCompression::None // Binary/random data
362        }
363    }
364
365    /// Compress data with fallback to raw on compression failure or expansion
366    ///
367    /// Returns the compressed data or original data if:
368    /// - Compression fails
369    /// - Compressed size >= original size (compression expanded the data)
370    fn compress(&self, data: &[u8], compression: BlockCompression) -> Result<Vec<u8>> {
371        match compression {
372            BlockCompression::None => Ok(data.to_vec()),
373            BlockCompression::Lz4 => {
374                match lz4::block::compress(data, None, false) {
375                    Ok(compressed) => {
376                        // Fallback to raw if compression didn't help
377                        if compressed.len() >= data.len() {
378                            Ok(data.to_vec())
379                        } else {
380                            Ok(compressed)
381                        }
382                    }
383                    Err(_) => {
384                        // Fallback to raw on compression failure
385                        Ok(data.to_vec())
386                    }
387                }
388            }
389            BlockCompression::Zstd => {
390                // Default compression level (3) for balance of speed/ratio
391                match zstd::encode_all(data, 3) {
392                    Ok(compressed) => {
393                        // Fallback to raw if compression didn't help
394                        if compressed.len() >= data.len() {
395                            Ok(data.to_vec())
396                        } else {
397                            Ok(compressed)
398                        }
399                    }
400                    Err(_) => {
401                        // Fallback to raw on compression failure
402                        Ok(data.to_vec())
403                    }
404                }
405            }
406        }
407    }
408
409    /// Decompress data with automatic format detection
410    fn decompress(
411        &self,
412        data: &[u8],
413        compression: BlockCompression,
414        original_size: usize,
415    ) -> Result<Vec<u8>> {
416        match compression {
417            BlockCompression::None => Ok(data.to_vec()),
418            BlockCompression::Lz4 => {
419                // If data matches original size, it's uncompressed (fallback case)
420                if data.len() == original_size {
421                    return Ok(data.to_vec());
422                }
423
424                lz4::block::decompress(data, Some(original_size as i32)).map_err(|e| {
425                    SochDBError::Corruption(format!("LZ4 decompression failed: {}", e))
426                })
427            }
428            BlockCompression::Zstd => {
429                // If data matches original size, it's uncompressed (fallback case)
430                if data.len() == original_size {
431                    return Ok(data.to_vec());
432                }
433
434                zstd::decode_all(data).map_err(|e| {
435                    SochDBError::Corruption(format!("ZSTD decompression failed: {}", e))
436                })
437            }
438        }
439    }
440
441    /// Increment reference count
442    pub fn add_ref(&self, offset: u64) {
443        let mut refs = self.ref_counts.write();
444        *refs.entry(offset).or_insert(0) += 1;
445    }
446
447    /// Decrement reference count
448    pub fn release_ref(&self, offset: u64) -> bool {
449        let mut refs = self.ref_counts.write();
450        if let Some(count) = refs.get_mut(&offset) {
451            *count = count.saturating_sub(1);
452            return *count == 0;
453        }
454        false
455    }
456
457    /// Get storage statistics
458    pub fn stats(&self) -> BlockStoreStats {
459        let data = self.data.read();
460        let index = self.index.read();
461
462        let mut total_original = 0u64;
463        let mut total_compressed = 0u64;
464
465        for block_ref in index.values() {
466            total_original += block_ref.original_len as u64;
467            total_compressed += block_ref.compressed_len as u64;
468        }
469
470        BlockStoreStats {
471            total_bytes: data.len() as u64,
472            block_count: index.len(),
473            total_original_bytes: total_original,
474            total_compressed_bytes: total_compressed,
475            compression_ratio: if total_compressed > 0 {
476                total_original as f64 / total_compressed as f64
477            } else {
478                1.0
479            },
480        }
481    }
482}
483
484impl Default for BlockStore {
485    fn default() -> Self {
486        Self::new()
487    }
488}
489
490/// Block store statistics
491#[derive(Debug, Clone, Default)]
492pub struct BlockStoreStats {
493    /// Total bytes in store
494    pub total_bytes: u64,
495    /// Number of blocks
496    pub block_count: usize,
497    /// Total original bytes (before compression)
498    pub total_original_bytes: u64,
499    /// Total compressed bytes
500    pub total_compressed_bytes: u64,
501    /// Compression ratio (original / compressed)
502    pub compression_ratio: f64,
503}
504
505/// Check if data looks like TOON format
506pub fn is_soch_content(data: &[u8]) -> bool {
507    // TOON typically starts with table name and brackets
508    if data.len() < 10 {
509        return false;
510    }
511
512    // Check for common TOON patterns: "name[", "name{", or starts with alphabetic
513    let s = String::from_utf8_lossy(&data[..data.len().min(100)]);
514    s.contains('[') && s.contains('{') && s.contains(':')
515}
516
517/// Check if data looks like JSON
518pub fn is_json_content(data: &[u8]) -> bool {
519    if data.is_empty() {
520        return false;
521    }
522
523    // JSON typically starts with { or [
524    let first = data[0];
525    first == b'{' || first == b'['
526}
527
528/// Check if data is likely compressible
529pub fn is_compressible(data: &[u8]) -> bool {
530    if data.len() < 64 {
531        return false;
532    }
533
534    // Count unique bytes in sample
535    let sample_size = data.len().min(256);
536    let mut seen = [false; 256];
537    let mut unique = 0;
538
539    for &byte in &data[..sample_size] {
540        if !seen[byte as usize] {
541            seen[byte as usize] = true;
542            unique += 1;
543        }
544    }
545
546    // If less than 50% unique bytes, likely compressible
547    unique < sample_size / 2
548}
549
550/// File block manager - higher level interface for file I/O
551pub struct FileBlockManager {
552    /// Block store
553    store: BlockStore,
554    /// Block size
555    block_size: usize,
556}
557
558impl FileBlockManager {
559    /// Create new file block manager
560    pub fn new(block_size: usize) -> Self {
561        Self {
562            store: BlockStore::new(),
563            block_size: block_size.min(MAX_BLOCK_SIZE),
564        }
565    }
566
567    /// Write file data, returns block references
568    pub fn write_file(&self, data: &[u8]) -> Result<Vec<BlockRef>> {
569        let mut blocks = Vec::new();
570
571        for chunk in data.chunks(self.block_size) {
572            let block_ref = self.store.write_block(chunk)?;
573            blocks.push(block_ref);
574        }
575
576        Ok(blocks)
577    }
578
579    /// Read file data from block references
580    pub fn read_file(&self, blocks: &[BlockRef]) -> Result<Vec<u8>> {
581        let mut data = Vec::new();
582
583        for block_ref in blocks {
584            let block_data = self.store.read_block(block_ref)?;
585            data.extend(block_data);
586        }
587
588        Ok(data)
589    }
590
591    /// Get underlying store stats
592    pub fn stats(&self) -> BlockStoreStats {
593        self.store.stats()
594    }
595}
596
597impl Default for FileBlockManager {
598    fn default() -> Self {
599        Self::new(DEFAULT_BLOCK_SIZE)
600    }
601}
602
603// ============================================================================
604// Durable Block Store with WAL (Task 2: Persistent Block Storage)
605// ============================================================================
606
607use std::fs::{File, OpenOptions};
608use std::io::{self, BufReader, BufWriter, Read, Seek, SeekFrom, Write};
609use std::path::{Path, PathBuf};
610
611/// WAL record types
612#[derive(Debug, Clone, Copy, PartialEq, Eq)]
613#[repr(u8)]
614pub enum WalRecordType {
615    /// Block write record
616    BlockWrite = 1,
617    /// Checkpoint marker
618    Checkpoint = 2,
619    /// Commit marker
620    Commit = 3,
621    /// Transaction begin
622    TxnBegin = 4,
623}
624
625impl WalRecordType {
626    fn from_byte(b: u8) -> Option<Self> {
627        match b {
628            1 => Some(WalRecordType::BlockWrite),
629            2 => Some(WalRecordType::Checkpoint),
630            3 => Some(WalRecordType::Commit),
631            4 => Some(WalRecordType::TxnBegin),
632            _ => None,
633        }
634    }
635}
636
637/// WAL record header (fixed 33-byte layout)
638///
639/// Layout:
640/// ```text
641/// Offset  Size  Field
642/// 0       8     lsn (Log Sequence Number)
643/// 8       8     txn_id
644/// 16      1     record_type
645/// 17      8     page_id
646/// 25      4     data_len
647/// 29      4     crc32 (checksum of header + data)
648/// Total: 33 bytes
649/// ```
650#[derive(Debug, Clone)]
651pub struct WalRecordHeader {
652    pub lsn: u64,
653    pub txn_id: u64,
654    pub record_type: WalRecordType,
655    pub page_id: u64,
656    pub data_len: u32,
657    pub crc32: u32,
658}
659
660impl WalRecordHeader {
661    pub const SIZE: usize = 33;
662
663    /// Serialize to bytes
664    pub fn to_bytes(&self) -> [u8; Self::SIZE] {
665        let mut buf = [0u8; Self::SIZE];
666        LittleEndian::write_u64(&mut buf[0..8], self.lsn);
667        LittleEndian::write_u64(&mut buf[8..16], self.txn_id);
668        buf[16] = self.record_type as u8;
669        LittleEndian::write_u64(&mut buf[17..25], self.page_id);
670        LittleEndian::write_u32(&mut buf[25..29], self.data_len);
671        LittleEndian::write_u32(&mut buf[29..33], self.crc32);
672        buf
673    }
674
675    /// Deserialize from bytes
676    pub fn from_bytes(buf: &[u8]) -> Result<Self> {
677        if buf.len() < Self::SIZE {
678            return Err(SochDBError::Corruption(format!(
679                "WAL record header too short: {} < {}",
680                buf.len(),
681                Self::SIZE
682            )));
683        }
684
685        let record_type = WalRecordType::from_byte(buf[16]).ok_or_else(|| {
686            SochDBError::Corruption(format!("Invalid WAL record type: {}", buf[16]))
687        })?;
688
689        Ok(Self {
690            lsn: LittleEndian::read_u64(&buf[0..8]),
691            txn_id: LittleEndian::read_u64(&buf[8..16]),
692            record_type,
693            page_id: LittleEndian::read_u64(&buf[17..25]),
694            data_len: LittleEndian::read_u32(&buf[25..29]),
695            crc32: LittleEndian::read_u32(&buf[29..33]),
696        })
697    }
698
699    /// Compute CRC32 for header + data
700    pub fn compute_crc32(&self, data: &[u8]) -> u32 {
701        let mut hasher = crc32fast::Hasher::new();
702
703        // Hash header fields (excluding crc32 itself)
704        let mut header_buf = [0u8; 29];
705        LittleEndian::write_u64(&mut header_buf[0..8], self.lsn);
706        LittleEndian::write_u64(&mut header_buf[8..16], self.txn_id);
707        header_buf[16] = self.record_type as u8;
708        LittleEndian::write_u64(&mut header_buf[17..25], self.page_id);
709        LittleEndian::write_u32(&mut header_buf[25..29], self.data_len);
710
711        hasher.update(&header_buf);
712        hasher.update(data);
713        hasher.finalize()
714    }
715}
716
717/// WAL writer for durable writes
718pub struct WalWriter {
719    /// WAL file handle
720    file: BufWriter<File>,
721    /// Next LSN to assign
722    next_lsn: u64,
723    /// Path to WAL file
724    #[allow(dead_code)]
725    path: PathBuf,
726}
727
728impl WalWriter {
729    /// Open or create WAL file
730    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
731        let path = path.as_ref().to_path_buf();
732
733        // Open file for append
734        let file = OpenOptions::new()
735            .create(true)
736            .read(true)
737            .append(true)
738            .open(&path)?;
739
740        // Get current file size for next_lsn
741        let metadata = file.metadata()?;
742        let next_lsn = metadata.len();
743
744        Ok(Self {
745            file: BufWriter::new(file),
746            next_lsn,
747            path,
748        })
749    }
750
751    /// Append a WAL record
752    pub fn append(
753        &mut self,
754        txn_id: u64,
755        record_type: WalRecordType,
756        page_id: u64,
757        data: &[u8],
758    ) -> Result<u64> {
759        let lsn = self.next_lsn;
760
761        let mut header = WalRecordHeader {
762            lsn,
763            txn_id,
764            record_type,
765            page_id,
766            data_len: data.len() as u32,
767            crc32: 0, // Will be filled in
768        };
769
770        // Compute CRC32
771        header.crc32 = header.compute_crc32(data);
772
773        // Write header
774        let header_bytes = header.to_bytes();
775        self.file.write_all(&header_bytes)?;
776
777        // Write data
778        self.file.write_all(data)?;
779
780        // Update next LSN
781        self.next_lsn += WalRecordHeader::SIZE as u64 + data.len() as u64;
782
783        Ok(lsn)
784    }
785
786    /// Sync WAL to disk (fsync)
787    pub fn sync(&mut self) -> Result<()> {
788        self.file.flush()?;
789        self.file.get_ref().sync_all()?;
790        Ok(())
791    }
792
793    /// Get current LSN
794    pub fn current_lsn(&self) -> u64 {
795        self.next_lsn
796    }
797}
798
799/// WAL reader for recovery
800pub struct WalReader {
801    reader: BufReader<File>,
802    #[allow(dead_code)]
803    path: PathBuf,
804}
805
806impl WalReader {
807    /// Open WAL file for reading
808    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
809        let path = path.as_ref().to_path_buf();
810        let file = File::open(&path)?;
811
812        Ok(Self {
813            reader: BufReader::new(file),
814            path,
815        })
816    }
817
818    /// Read next WAL record
819    pub fn read_next(&mut self) -> Result<Option<(WalRecordHeader, Vec<u8>)>> {
820        // Read header
821        let mut header_buf = [0u8; WalRecordHeader::SIZE];
822        match self.reader.read_exact(&mut header_buf) {
823            Ok(()) => {}
824            Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
825            Err(e) => return Err(e.into()),
826        }
827
828        let header = WalRecordHeader::from_bytes(&header_buf)?;
829
830        // Read data
831        let mut data = vec![0u8; header.data_len as usize];
832        self.reader.read_exact(&mut data)?;
833
834        // Verify CRC32
835        let computed_crc = header.compute_crc32(&data);
836        if computed_crc != header.crc32 {
837            return Err(SochDBError::Corruption(format!(
838                "WAL CRC mismatch at LSN {}: expected {:#x}, got {:#x}",
839                header.lsn, header.crc32, computed_crc
840            )));
841        }
842
843        Ok(Some((header, data)))
844    }
845
846    /// Iterate over all records
847    pub fn iter(&mut self) -> WalIterator<'_> {
848        WalIterator { reader: self }
849    }
850}
851
852/// Iterator over WAL records
853pub struct WalIterator<'a> {
854    reader: &'a mut WalReader,
855}
856
857impl<'a> Iterator for WalIterator<'a> {
858    type Item = Result<(WalRecordHeader, Vec<u8>)>;
859
860    fn next(&mut self) -> Option<Self::Item> {
861        match self.reader.read_next() {
862            Ok(Some(record)) => Some(Ok(record)),
863            Ok(None) => None,
864            Err(e) => Some(Err(e)),
865        }
866    }
867}
868
869/// Durable block store with WAL-backed persistence
870///
871/// Provides ACID guarantees through write-ahead logging:
872/// - Writes are first logged to WAL, then synced
873/// - On crash, replays WAL to recover state
874/// - Checkpoint mechanism to truncate WAL
875pub struct DurableBlockStore {
876    /// In-memory block store (cache)
877    store: BlockStore,
878    /// WAL writer
879    wal: parking_lot::Mutex<WalWriter>,
880    /// Data file for persistent blocks
881    data_file: parking_lot::Mutex<File>,
882    /// Dirty pages not yet flushed to data file
883    dirty_pages: RwLock<HashMap<u64, Vec<u8>>>,
884    /// Checkpoint LSN (WAL can be truncated before this)
885    checkpoint_lsn: AtomicU64,
886    /// Path to data directory
887    data_dir: PathBuf,
888    /// Next page ID
889    next_page_id: AtomicU64,
890}
891
892impl DurableBlockStore {
893    /// Open or create a durable block store
894    pub fn open<P: AsRef<Path>>(data_dir: P) -> Result<Self> {
895        let data_dir = data_dir.as_ref().to_path_buf();
896
897        // Create directory if needed
898        std::fs::create_dir_all(&data_dir)?;
899
900        let wal_path = data_dir.join("wal.log");
901        let data_path = data_dir.join("blocks.dat");
902
903        // Open WAL
904        let wal = WalWriter::open(&wal_path)?;
905
906        // Open data file
907        let data_file = OpenOptions::new()
908            .create(true)
909            .read(true)
910            .write(true)
911            .truncate(false)
912            .open(&data_path)?;
913
914        let store = Self {
915            store: BlockStore::new(),
916            wal: parking_lot::Mutex::new(wal),
917            data_file: parking_lot::Mutex::new(data_file),
918            dirty_pages: RwLock::new(HashMap::new()),
919            checkpoint_lsn: AtomicU64::new(0),
920            data_dir,
921            next_page_id: AtomicU64::new(0),
922        };
923
924        Ok(store)
925    }
926
927    /// Write a block with WAL durability
928    ///
929    /// 1. Writes to WAL and fsyncs
930    /// 2. Updates in-memory store
931    /// 3. Marks page as dirty (will be flushed at checkpoint)
932    pub fn write_block(&self, txn_id: u64, data: &[u8]) -> Result<BlockRef> {
933        // Allocate page ID
934        let page_id = self.next_page_id.fetch_add(1, Ordering::SeqCst);
935
936        // Write to WAL first
937        {
938            let mut wal = self.wal.lock();
939            wal.append(txn_id, WalRecordType::BlockWrite, page_id, data)?;
940            wal.sync()?; // Durability point
941        }
942
943        // Write to in-memory store
944        let block_ref = self.store.write_block(data)?;
945
946        // Mark as dirty (for checkpoint flushing)
947        self.dirty_pages.write().insert(page_id, data.to_vec());
948
949        Ok(block_ref)
950    }
951
952    /// Read a block
953    pub fn read_block(&self, block_ref: &BlockRef) -> Result<Vec<u8>> {
954        self.store.read_block(block_ref)
955    }
956
957    /// Commit a transaction
958    pub fn commit(&self, txn_id: u64) -> Result<u64> {
959        let mut wal = self.wal.lock();
960        let lsn = wal.append(txn_id, WalRecordType::Commit, 0, &[])?;
961        wal.sync()?; // Durability point for commit
962        Ok(lsn)
963    }
964
965    /// Create a checkpoint
966    ///
967    /// 1. Flushes all dirty pages to data file
968    /// 2. Writes checkpoint marker to WAL
969    /// 3. Updates checkpoint LSN (WAL can be truncated before this)
970    pub fn checkpoint(&self) -> Result<u64> {
971        // Collect dirty pages
972        let dirty: Vec<(u64, Vec<u8>)> = {
973            let mut pages = self.dirty_pages.write();
974            pages.drain().collect()
975        };
976
977        // Flush to data file
978        {
979            let mut file = self.data_file.lock();
980            for (page_id, data) in &dirty {
981                let offset = *page_id * (DEFAULT_BLOCK_SIZE as u64 + BlockHeader::SIZE as u64);
982                file.seek(SeekFrom::Start(offset))?;
983                file.write_all(data)?;
984            }
985            file.sync_all()?;
986        }
987
988        // Write checkpoint marker
989        let lsn = {
990            let mut wal = self.wal.lock();
991            let lsn = wal.append(0, WalRecordType::Checkpoint, 0, &[])?;
992            wal.sync()?;
993            lsn
994        };
995
996        // Update checkpoint LSN
997        self.checkpoint_lsn.store(lsn, Ordering::SeqCst);
998
999        Ok(lsn)
1000    }
1001
1002    /// Recover from WAL after crash
1003    ///
1004    /// Replays all committed transactions from the last checkpoint.
1005    pub fn recover(&mut self) -> Result<RecoveryStats> {
1006        let wal_path = self.data_dir.join("wal.log");
1007
1008        if !wal_path.exists() {
1009            return Ok(RecoveryStats::default());
1010        }
1011
1012        let mut reader = WalReader::open(&wal_path)?;
1013        let mut stats = RecoveryStats::default();
1014
1015        // Track active transactions
1016        let mut pending_txns: HashMap<u64, Vec<(u64, Vec<u8>)>> = HashMap::new();
1017        let mut committed_txns: std::collections::HashSet<u64> = std::collections::HashSet::new();
1018
1019        // Read all WAL records
1020        for record_result in reader.iter() {
1021            let (header, data) = record_result?;
1022            stats.records_read += 1;
1023
1024            match header.record_type {
1025                WalRecordType::BlockWrite => {
1026                    pending_txns
1027                        .entry(header.txn_id)
1028                        .or_default()
1029                        .push((header.page_id, data));
1030                }
1031                WalRecordType::Commit => {
1032                    committed_txns.insert(header.txn_id);
1033                    stats.txns_committed += 1;
1034                }
1035                WalRecordType::Checkpoint => {
1036                    self.checkpoint_lsn.store(header.lsn, Ordering::SeqCst);
1037                    stats.checkpoints_found += 1;
1038                }
1039                WalRecordType::TxnBegin => {
1040                    // Just track
1041                }
1042            }
1043        }
1044
1045        // Redo committed transactions
1046        for txn_id in &committed_txns {
1047            if let Some(writes) = pending_txns.remove(txn_id) {
1048                for (page_id, data) in writes {
1049                    self.store.write_block(&data)?;
1050                    self.next_page_id.fetch_max(page_id + 1, Ordering::SeqCst);
1051                    stats.blocks_recovered += 1;
1052                }
1053            }
1054        }
1055
1056        // Count aborted transactions (uncommitted)
1057        stats.txns_aborted = pending_txns.len();
1058
1059        Ok(stats)
1060    }
1061
1062    /// Get statistics
1063    pub fn stats(&self) -> DurableBlockStoreStats {
1064        let store_stats = self.store.stats();
1065        DurableBlockStoreStats {
1066            block_stats: store_stats,
1067            dirty_page_count: self.dirty_pages.read().len(),
1068            checkpoint_lsn: self.checkpoint_lsn.load(Ordering::SeqCst),
1069            wal_size: self.wal.lock().current_lsn(),
1070        }
1071    }
1072}
1073
1074/// Recovery statistics
1075#[derive(Debug, Clone, Default)]
1076pub struct RecoveryStats {
1077    /// Number of WAL records read
1078    pub records_read: usize,
1079    /// Number of committed transactions
1080    pub txns_committed: usize,
1081    /// Number of aborted transactions (not committed before crash)
1082    pub txns_aborted: usize,
1083    /// Number of blocks recovered
1084    pub blocks_recovered: usize,
1085    /// Number of checkpoints found
1086    pub checkpoints_found: usize,
1087}
1088
1089/// Durable block store statistics
1090#[derive(Debug, Clone)]
1091pub struct DurableBlockStoreStats {
1092    /// Block store stats
1093    pub block_stats: BlockStoreStats,
1094    /// Number of dirty pages (not yet flushed)
1095    pub dirty_page_count: usize,
1096    /// Last checkpoint LSN
1097    pub checkpoint_lsn: u64,
1098    /// Current WAL size (bytes)
1099    pub wal_size: u64,
1100}
1101
1102#[cfg(test)]
1103mod tests {
1104    use super::*;
1105
1106    #[test]
1107    fn test_block_store_write_read() {
1108        let store = BlockStore::new();
1109
1110        let data = b"Hello, SochFS block storage!";
1111        let block_ref = store.write_block(data).unwrap();
1112
1113        let read_data = store.read_block(&block_ref).unwrap();
1114        assert_eq!(read_data, data);
1115    }
1116
1117    #[test]
1118    fn test_compression_selection() {
1119        let store = BlockStore::new();
1120
1121        // Small data: no compression
1122        let small = b"hi";
1123        assert_eq!(store.select_compression(small), BlockCompression::None);
1124
1125        // TOON-like content (NOTE: compression stub returns None for now)
1126        let toon = b"users[5]{id,name}:\n1,Alice\n2,Bob\n3,Charlie";
1127        // In production with zstd, this would return Zstd
1128        let compression = store.select_compression(toon);
1129        assert!(compression == BlockCompression::Zstd || compression == BlockCompression::None);
1130
1131        // JSON content (NOTE: compression stub returns None for now)
1132        let json = br#"{"users": [{"id": 1, "name": "Alice"}]}"#;
1133        // In production with lz4, this would return Lz4
1134        let compression = store.select_compression(json);
1135        assert!(compression == BlockCompression::Lz4 || compression == BlockCompression::None);
1136    }
1137
1138    #[test]
1139    fn test_lz4_compression() {
1140        let store = BlockStore::new();
1141
1142        let data = "Hello, world! ".repeat(100);
1143        let block_ref = store
1144            .write_block_with_compression(data.as_bytes(), BlockCompression::Lz4)
1145            .unwrap();
1146
1147        // NOTE: With stub compression, compressed_len == original_len
1148        // In production with lz4, compressed_len < original_len
1149        // For now, just verify data integrity
1150        let read_data = store.read_block(&block_ref).unwrap();
1151        assert_eq!(read_data, data.as_bytes());
1152    }
1153
1154    #[test]
1155    fn test_zstd_compression() {
1156        let store = BlockStore::new();
1157
1158        let data = "TOON format is very repetitive ".repeat(100);
1159        let block_ref = store
1160            .write_block_with_compression(data.as_bytes(), BlockCompression::Zstd)
1161            .unwrap();
1162
1163        // NOTE: With stub compression, compressed_len == original_len
1164        // In production with zstd, compressed_len < original_len
1165        // For now, just verify data integrity
1166        let read_data = store.read_block(&block_ref).unwrap();
1167        assert_eq!(read_data, data.as_bytes());
1168    }
1169
1170    #[test]
1171    fn test_file_block_manager() {
1172        let manager = FileBlockManager::new(1024);
1173
1174        let data = "Test data ".repeat(500); // ~5KB, multiple blocks
1175        let blocks = manager.write_file(data.as_bytes()).unwrap();
1176
1177        assert!(blocks.len() > 1); // Should have multiple blocks
1178
1179        let read_data = manager.read_file(&blocks).unwrap();
1180        assert_eq!(read_data, data.as_bytes());
1181    }
1182
1183    #[test]
1184    fn test_stats() {
1185        let store = BlockStore::new();
1186
1187        // Write some compressible data
1188        let data = "Repetitive data pattern ".repeat(100);
1189        store.write_block(data.as_bytes()).unwrap();
1190
1191        let stats = store.stats();
1192        assert_eq!(stats.block_count, 1);
1193        // NOTE: With stub compression, ratio is 1.0
1194        // In production, ratio would be > 1.0 for compressible data
1195        assert!(stats.compression_ratio >= 1.0);
1196    }
1197
1198    // ========================================================================
1199    // Task 1: Fixed-Layout BlockHeader Tests
1200    // ========================================================================
1201
1202    #[test]
1203    fn test_block_header_fixed_layout() {
1204        let header = BlockHeader {
1205            magic: BlockHeader::MAGIC,
1206            compression: BlockCompression::Zstd as u8,
1207            original_size: 4096,
1208            compressed_size: 1024,
1209            checksum: 0xDEADBEEF,
1210        };
1211
1212        let bytes = header.to_bytes();
1213
1214        // Verify exact 17-byte size
1215        assert_eq!(bytes.len(), 17);
1216
1217        // Verify magic at offset 0-3
1218        assert_eq!(&bytes[0..4], b"TBLK");
1219
1220        // Verify compression at offset 4
1221        assert_eq!(bytes[4], BlockCompression::Zstd as u8);
1222
1223        // Verify original_size at offset 5-8 (little-endian)
1224        assert_eq!(LittleEndian::read_u32(&bytes[5..9]), 4096);
1225
1226        // Verify compressed_size at offset 9-12 (little-endian)
1227        assert_eq!(LittleEndian::read_u32(&bytes[9..13]), 1024);
1228
1229        // Verify checksum at offset 13-16 (little-endian)
1230        assert_eq!(LittleEndian::read_u32(&bytes[13..17]), 0xDEADBEEF);
1231    }
1232
1233    #[test]
1234    fn test_block_header_roundtrip() {
1235        let original = BlockHeader {
1236            magic: BlockHeader::MAGIC,
1237            compression: BlockCompression::Lz4 as u8,
1238            original_size: 65536,
1239            compressed_size: 32768,
1240            checksum: 0x12345678,
1241        };
1242
1243        let bytes = original.to_bytes();
1244        let recovered = BlockHeader::from_bytes(&bytes).unwrap();
1245
1246        assert_eq!(recovered.compression, original.compression);
1247        assert_eq!(recovered.original_size, original.original_size);
1248        assert_eq!(recovered.compressed_size, original.compressed_size);
1249        assert_eq!(recovered.checksum, original.checksum);
1250    }
1251
1252    #[test]
1253    fn test_block_header_invalid_magic() {
1254        let mut bytes = [0u8; 17];
1255        bytes[0..4].copy_from_slice(b"XXXX"); // Invalid magic
1256
1257        let result = BlockHeader::from_bytes(&bytes);
1258        assert!(result.is_err());
1259
1260        let err = result.unwrap_err();
1261        assert!(err.to_string().contains("Invalid block magic"));
1262    }
1263
1264    #[test]
1265    fn test_block_header_too_short() {
1266        let bytes = [0u8; 10]; // Only 10 bytes, need 17
1267
1268        let result = BlockHeader::from_bytes(&bytes);
1269        assert!(result.is_err());
1270    }
1271
1272    // ========================================================================
1273    // Task 5: BlockRef Error Propagation Tests
1274    // ========================================================================
1275
1276    #[test]
1277    fn test_block_ref_fixed_layout() {
1278        let block_ref = BlockRef {
1279            store_offset: 0x123456789ABCDEF0,
1280            compressed_len: 4096,
1281            original_len: 8192,
1282            compression: BlockCompression::Zstd,
1283            checksum: 0xCAFEBABE,
1284        };
1285
1286        let bytes = block_ref.to_bytes().unwrap();
1287
1288        // Verify exact 21-byte size
1289        assert_eq!(bytes.len(), 21);
1290
1291        // Verify offset at 0-7 (little-endian)
1292        assert_eq!(LittleEndian::read_u64(&bytes[0..8]), 0x123456789ABCDEF0);
1293
1294        // Verify compressed_len at 8-11
1295        assert_eq!(LittleEndian::read_u32(&bytes[8..12]), 4096);
1296
1297        // Verify original_len at 12-15
1298        assert_eq!(LittleEndian::read_u32(&bytes[12..16]), 8192);
1299
1300        // Verify compression at 16
1301        assert_eq!(bytes[16], BlockCompression::Zstd as u8);
1302
1303        // Verify checksum at 17-20
1304        assert_eq!(LittleEndian::read_u32(&bytes[17..21]), 0xCAFEBABE);
1305    }
1306
1307    #[test]
1308    fn test_block_ref_roundtrip() {
1309        let original = BlockRef {
1310            store_offset: u64::MAX, // Test large values
1311            compressed_len: u32::MAX,
1312            original_len: u32::MAX,
1313            compression: BlockCompression::None,
1314            checksum: u32::MAX,
1315        };
1316
1317        let bytes = original.to_bytes().unwrap();
1318        let recovered = BlockRef::from_bytes(&bytes).unwrap();
1319
1320        assert_eq!(recovered.store_offset, original.store_offset);
1321        assert_eq!(recovered.compressed_len, original.compressed_len);
1322        assert_eq!(recovered.original_len, original.original_len);
1323        assert_eq!(recovered.compression, original.compression);
1324        assert_eq!(recovered.checksum, original.checksum);
1325    }
1326
1327    #[test]
1328    fn test_block_ref_too_short() {
1329        let bytes = [0u8; 10]; // Only 10 bytes, need 21
1330
1331        let result = BlockRef::from_bytes(&bytes);
1332        assert!(result.is_err());
1333
1334        let err = result.unwrap_err();
1335        assert!(err.to_string().contains("BlockRef too short"));
1336    }
1337
1338    #[test]
1339    fn test_cross_platform_compatibility() {
1340        // Test that serialization is deterministic regardless of platform
1341        let block_ref = BlockRef {
1342            store_offset: 0x0102030405060708,
1343            compressed_len: 0x0A0B0C0D,
1344            original_len: 0x0E0F1011,
1345            compression: BlockCompression::Lz4,
1346            checksum: 0x12131415,
1347        };
1348
1349        let bytes = block_ref.to_bytes().unwrap();
1350
1351        // These exact byte values should be the same on any platform
1352        // (little-endian encoding)
1353        assert_eq!(bytes[0], 0x08); // LSB of offset
1354        assert_eq!(bytes[7], 0x01); // MSB of offset
1355        assert_eq!(bytes[8], 0x0D); // LSB of compressed_len
1356        assert_eq!(bytes[17], 0x15); // LSB of checksum
1357    }
1358
1359    // ========================================================================
1360    // Task 2: LZ4/ZSTD Production Compression Tests
1361    // ========================================================================
1362
1363    #[test]
1364    fn test_lz4_compression_roundtrip() {
1365        let store = BlockStore::new();
1366
1367        // Create compressible data (repetitive pattern)
1368        let data: Vec<u8> = (0..4096).map(|i| (i % 256) as u8).collect();
1369
1370        let block_ref = store
1371            .write_block_with_compression(&data, BlockCompression::Lz4)
1372            .unwrap();
1373        let recovered = store.read_block(&block_ref).unwrap();
1374
1375        assert_eq!(recovered, data);
1376        // Verify compression actually happened (data was compressible)
1377        assert!(block_ref.compressed_len <= block_ref.original_len);
1378    }
1379
1380    #[test]
1381    fn test_zstd_compression_roundtrip() {
1382        let store = BlockStore::new();
1383
1384        // Create highly compressible data (all zeros)
1385        let data = vec![0u8; 8192];
1386
1387        let block_ref = store
1388            .write_block_with_compression(&data, BlockCompression::Zstd)
1389            .unwrap();
1390        let recovered = store.read_block(&block_ref).unwrap();
1391
1392        assert_eq!(recovered, data);
1393        // Verify compression was very effective
1394        assert!(block_ref.compressed_len < block_ref.original_len / 2);
1395    }
1396
1397    #[test]
1398    fn test_compression_fallback_on_incompressible() {
1399        let store = BlockStore::new();
1400
1401        // Create random-looking data (incompressible)
1402        let mut data = vec![0u8; 256];
1403        for (i, byte) in data.iter_mut().enumerate().take(256) {
1404            *byte = ((i * 17 + 31) % 256) as u8; // Pseudo-random
1405        }
1406
1407        let block_ref = store
1408            .write_block_with_compression(&data, BlockCompression::Lz4)
1409            .unwrap();
1410        let recovered = store.read_block(&block_ref).unwrap();
1411
1412        assert_eq!(recovered, data);
1413    }
1414
1415    #[test]
1416    fn test_automatic_compression_selection() {
1417        let store = BlockStore::new();
1418
1419        // Test JSON content (should select LZ4)
1420        let json_data = br#"{"name": "test", "value": 123, "items": [1, 2, 3]}"#.repeat(10);
1421        let json_ref = store.write_block(&json_data).unwrap();
1422        let json_recovered = store.read_block(&json_ref).unwrap();
1423        assert_eq!(json_recovered, json_data);
1424
1425        // Test TOON content (should select ZSTD)
1426        let mut soch_data = vec![0u8; 256];
1427        soch_data[0..4].copy_from_slice(b"TOON"); // Magic header
1428        let soch_ref = store.write_block(&soch_data).unwrap();
1429        let soch_recovered = store.read_block(&soch_ref).unwrap();
1430        assert_eq!(soch_recovered, soch_data);
1431    }
1432
1433    #[test]
1434    fn test_small_data_no_compression() {
1435        let store = BlockStore::new();
1436
1437        // Data smaller than 128 bytes should not be compressed
1438        let small_data = vec![42u8; 64];
1439        let block_ref = store.write_block(&small_data).unwrap();
1440
1441        // Should be stored uncompressed
1442        assert_eq!(block_ref.compression, BlockCompression::None);
1443
1444        let recovered = store.read_block(&block_ref).unwrap();
1445        assert_eq!(recovered, small_data);
1446    }
1447
1448    #[test]
1449    fn test_compression_stats() {
1450        let store = BlockStore::new();
1451
1452        // Write compressible data
1453        let data = vec![0u8; 4096];
1454        store
1455            .write_block_with_compression(&data, BlockCompression::Zstd)
1456            .unwrap();
1457
1458        let stats = store.stats();
1459        assert_eq!(stats.block_count, 1);
1460        assert!(
1461            stats.compression_ratio > 1.0,
1462            "Compression should reduce size"
1463        );
1464        assert!(stats.total_original_bytes > stats.total_compressed_bytes);
1465    }
1466
1467    // ========================================================================
1468    // Task 2: Durable Block Store with WAL Tests
1469    // ========================================================================
1470
1471    #[test]
1472    fn test_wal_record_header_roundtrip() {
1473        let original = WalRecordHeader {
1474            lsn: 12345,
1475            txn_id: 67890,
1476            record_type: WalRecordType::BlockWrite,
1477            page_id: 42,
1478            data_len: 4096,
1479            crc32: 0xDEADBEEF,
1480        };
1481
1482        let bytes = original.to_bytes();
1483        let recovered = WalRecordHeader::from_bytes(&bytes).unwrap();
1484
1485        assert_eq!(recovered.lsn, original.lsn);
1486        assert_eq!(recovered.txn_id, original.txn_id);
1487        assert_eq!(recovered.record_type, original.record_type);
1488        assert_eq!(recovered.page_id, original.page_id);
1489        assert_eq!(recovered.data_len, original.data_len);
1490        assert_eq!(recovered.crc32, original.crc32);
1491    }
1492
1493    #[test]
1494    fn test_wal_crc32() {
1495        let header = WalRecordHeader {
1496            lsn: 100,
1497            txn_id: 1,
1498            record_type: WalRecordType::BlockWrite,
1499            page_id: 0,
1500            data_len: 4,
1501            crc32: 0,
1502        };
1503
1504        let data = b"test";
1505        let crc1 = header.compute_crc32(data);
1506        let crc2 = header.compute_crc32(data);
1507
1508        assert_eq!(crc1, crc2, "CRC should be deterministic");
1509
1510        // Different data should produce different CRC
1511        let different_data = b"TEST";
1512        let crc3 = header.compute_crc32(different_data);
1513        assert_ne!(crc1, crc3, "Different data should have different CRC");
1514    }
1515
1516    #[test]
1517    fn test_durable_block_store_basic() {
1518        let dir = tempfile::tempdir().unwrap();
1519
1520        let store = DurableBlockStore::open(dir.path()).unwrap();
1521
1522        // Write a block
1523        let data = b"Hello, durable block store!";
1524        let block_ref = store.write_block(1, data).unwrap();
1525
1526        // Read it back
1527        let read_data = store.read_block(&block_ref).unwrap();
1528        assert_eq!(read_data, data);
1529
1530        // Commit the transaction
1531        store.commit(1).unwrap();
1532
1533        // Check stats
1534        let stats = store.stats();
1535        assert_eq!(stats.dirty_page_count, 1);
1536    }
1537
1538    #[test]
1539    fn test_durable_block_store_checkpoint() {
1540        let dir = tempfile::tempdir().unwrap();
1541
1542        let store = DurableBlockStore::open(dir.path()).unwrap();
1543
1544        // Write some blocks
1545        store.write_block(1, b"block1").unwrap();
1546        store.write_block(1, b"block2").unwrap();
1547        store.write_block(1, b"block3").unwrap();
1548        store.commit(1).unwrap();
1549
1550        // Checkpoint should flush dirty pages
1551        let checkpoint_lsn = store.checkpoint().unwrap();
1552        assert!(checkpoint_lsn > 0);
1553
1554        // Dirty pages should be cleared
1555        let stats = store.stats();
1556        assert_eq!(stats.dirty_page_count, 0);
1557        assert_eq!(stats.checkpoint_lsn, checkpoint_lsn);
1558    }
1559
1560    #[test]
1561    fn test_durable_block_store_recovery() {
1562        let dir = tempfile::tempdir().unwrap();
1563
1564        // Phase 1: Write and commit
1565        {
1566            let store = DurableBlockStore::open(dir.path()).unwrap();
1567            store.write_block(1, b"data1").unwrap();
1568            store.write_block(1, b"data2").unwrap();
1569            store.commit(1).unwrap();
1570
1571            // Don't checkpoint - simulate crash before flush
1572        }
1573
1574        // Phase 2: Recover
1575        {
1576            let mut store = DurableBlockStore::open(dir.path()).unwrap();
1577            let stats = store.recover().unwrap();
1578
1579            // Should have recovered the committed transaction
1580            assert_eq!(stats.txns_committed, 1);
1581            assert_eq!(stats.blocks_recovered, 2);
1582            assert_eq!(stats.txns_aborted, 0);
1583        }
1584    }
1585
1586    #[test]
1587    fn test_durable_block_store_uncommitted_recovery() {
1588        let dir = tempfile::tempdir().unwrap();
1589
1590        // Phase 1: Write but don't commit (simulate crash)
1591        {
1592            let store = DurableBlockStore::open(dir.path()).unwrap();
1593            store.write_block(1, b"uncommitted_data").unwrap();
1594            // NO commit - transaction should be aborted on recovery
1595        }
1596
1597        // Phase 2: Recover
1598        {
1599            let mut store = DurableBlockStore::open(dir.path()).unwrap();
1600            let stats = store.recover().unwrap();
1601
1602            // Uncommitted transaction should be aborted
1603            assert_eq!(stats.txns_committed, 0);
1604            assert_eq!(stats.txns_aborted, 1);
1605            assert_eq!(stats.blocks_recovered, 0);
1606        }
1607    }
1608
1609    #[test]
1610    fn test_wal_writer_reader_roundtrip() {
1611        let dir = tempfile::tempdir().unwrap();
1612        let wal_path = dir.path().join("test.wal");
1613
1614        // Write records
1615        {
1616            let mut writer = WalWriter::open(&wal_path).unwrap();
1617            writer.append(1, WalRecordType::TxnBegin, 0, &[]).unwrap();
1618            writer
1619                .append(1, WalRecordType::BlockWrite, 0, b"data1")
1620                .unwrap();
1621            writer
1622                .append(1, WalRecordType::BlockWrite, 1, b"data2")
1623                .unwrap();
1624            writer.append(1, WalRecordType::Commit, 0, &[]).unwrap();
1625            writer.sync().unwrap();
1626        }
1627
1628        // Read records
1629        {
1630            let mut reader = WalReader::open(&wal_path).unwrap();
1631            let mut records = Vec::new();
1632            for record in reader.iter() {
1633                records.push(record.unwrap());
1634            }
1635
1636            assert_eq!(records.len(), 4);
1637            assert_eq!(records[0].0.record_type, WalRecordType::TxnBegin);
1638            assert_eq!(records[1].0.record_type, WalRecordType::BlockWrite);
1639            assert_eq!(records[1].1, b"data1");
1640            assert_eq!(records[2].0.record_type, WalRecordType::BlockWrite);
1641            assert_eq!(records[2].1, b"data2");
1642            assert_eq!(records[3].0.record_type, WalRecordType::Commit);
1643        }
1644    }
1645}