sochdb_core/
block_storage.rs

1// Copyright 2025 Sushanth (https://github.com/sushanthpy)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! File Data Block Storage (Task 12)
16//!
17//! Integrates PayloadStore as SochFS data block backend:
18//! - O(1) append-only writes (no LSM compaction overhead)
19//! - Transparent compression (LZ4/ZSTD based on content type)
20//! - Block reference tracking for garbage collection
21//!
22//! ## Block Storage Architecture
23//!
24//! ```text
25//! ┌─────────────────────────────────────────────────────────┐
26//! │                      Inode Table                        │
27//! │  inode_id=7, blocks=[(off=0, len=4096, cmp=LZ4),       │
28//! │                      (off=4096, len=2048, cmp=ZSTD)]   │
29//! └─────────────────────────────────────────────────────────┘
30//!                            │
31//!                            ▼
32//! ┌─────────────────────────────────────────────────────────┐
33//! │                    PayloadStore                         │
34//! │  Offset 0:    [LZ4 header][compressed block 1]         │
35//! │  Offset 4096: [ZSTD header][compressed block 2]        │
36//! └─────────────────────────────────────────────────────────┘
37//! ```
38//!
39//! ## Architectural Improvements (Production Tasks)
40//!
41//! ### Task 1: Fixed-Layout BlockHeader
42//!
43//! Uses deterministic 17-byte fixed layout with explicit little-endian encoding:
44//! ```text
45//! Offset  Size  Field            Type
46//! 0       4     magic            [u8; 4] = "TBLK"
47//! 4       1     compression      u8
48//! 5       4     original_size    u32 (LE)
49//! 9       4     compressed_size  u32 (LE)
50//! 13      4     checksum         u32 (LE, CRC32)
51//! Total: 17 bytes
52//! ```
53//!
54//! ### Task 5: Error Propagation
55//!
56//! All serialization methods return `Result<T, SochDBError>` instead of
57//! using `unwrap_or_default()` which silently swallows errors.
58
59use byteorder::{ByteOrder, LittleEndian};
60use parking_lot::RwLock;
61use serde::{Deserialize, Serialize};
62use std::collections::HashMap;
63use std::sync::atomic::{AtomicU64, Ordering};
64
65use crate::{Result, SochDBError};
66
67/// Default block size (4KB)
68pub const DEFAULT_BLOCK_SIZE: usize = 4096;
69
70/// Maximum block size (1MB)
71pub const MAX_BLOCK_SIZE: usize = 1024 * 1024;
72
73/// Compression type for blocks
74#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
75#[repr(u8)]
76pub enum BlockCompression {
77    /// No compression
78    None = 0,
79    /// LZ4 (fast)
80    Lz4 = 1,
81    /// ZSTD (high ratio)
82    Zstd = 2,
83}
84
85impl BlockCompression {
86    pub fn from_byte(b: u8) -> Self {
87        match b {
88            1 => BlockCompression::Lz4,
89            2 => BlockCompression::Zstd,
90            _ => BlockCompression::None,
91        }
92    }
93
94    pub fn to_byte(&self) -> u8 {
95        *self as u8
96    }
97}
98
99/// Block reference stored in inode
100#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct BlockRef {
102    /// Offset in payload store
103    pub store_offset: u64,
104    /// Compressed length
105    pub compressed_len: u32,
106    /// Original length
107    pub original_len: u32,
108    /// Compression type
109    pub compression: BlockCompression,
110    /// Checksum
111    pub checksum: u32,
112}
113
114/// Fixed size of BlockRef when serialized (21 bytes)
115/// Layout: offset(8) + compressed_len(4) + original_len(4) + compression(1) + checksum(4)
116impl BlockRef {
117    /// Fixed serialization size
118    pub const SERIALIZED_SIZE: usize = 21;
119
120    /// Serialize to fixed-layout bytes (Task 5: returns Result)
121    pub fn to_bytes(&self) -> Result<[u8; Self::SERIALIZED_SIZE]> {
122        let mut buf = [0u8; Self::SERIALIZED_SIZE];
123        LittleEndian::write_u64(&mut buf[0..8], self.store_offset);
124        LittleEndian::write_u32(&mut buf[8..12], self.compressed_len);
125        LittleEndian::write_u32(&mut buf[12..16], self.original_len);
126        buf[16] = self.compression as u8;
127        LittleEndian::write_u32(&mut buf[17..21], self.checksum);
128        Ok(buf)
129    }
130
131    /// Deserialize from fixed-layout bytes
132    pub fn from_bytes(data: &[u8]) -> Result<Self> {
133        if data.len() < Self::SERIALIZED_SIZE {
134            return Err(SochDBError::Serialization(format!(
135                "BlockRef too short: {} < {}",
136                data.len(),
137                Self::SERIALIZED_SIZE
138            )));
139        }
140
141        Ok(Self {
142            store_offset: LittleEndian::read_u64(&data[0..8]),
143            compressed_len: LittleEndian::read_u32(&data[8..12]),
144            original_len: LittleEndian::read_u32(&data[12..16]),
145            compression: BlockCompression::from_byte(data[16]),
146            checksum: LittleEndian::read_u32(&data[17..21]),
147        })
148    }
149}
150
151/// Block header in payload store (Task 1: Fixed 17-byte layout)
152///
153/// Layout (all integers little-endian):
154/// ```text
155/// Offset  Size  Field            
156/// 0       4     magic = "TBLK"   
157/// 4       1     compression      
158/// 5       4     original_size    
159/// 9       4     compressed_size  
160/// 13      4     checksum (CRC32)
161/// Total: 17 bytes
162/// ```
163#[derive(Debug, Clone)]
164pub struct BlockHeader {
165    /// Magic bytes
166    pub magic: [u8; 4],
167    /// Compression type
168    pub compression: u8,
169    /// Original size
170    pub original_size: u32,
171    /// Compressed size
172    pub compressed_size: u32,
173    /// CRC32 checksum
174    pub checksum: u32,
175}
176
177impl BlockHeader {
178    pub const MAGIC: [u8; 4] = *b"TBLK";
179    pub const SIZE: usize = 17; // Fixed layout size
180
181    /// Serialize to fixed 17-byte layout (Task 1: deterministic serialization)
182    pub fn to_bytes(&self) -> [u8; Self::SIZE] {
183        let mut buf = [0u8; Self::SIZE];
184        buf[0..4].copy_from_slice(&Self::MAGIC);
185        buf[4] = self.compression;
186        LittleEndian::write_u32(&mut buf[5..9], self.original_size);
187        LittleEndian::write_u32(&mut buf[9..13], self.compressed_size);
188        LittleEndian::write_u32(&mut buf[13..17], self.checksum);
189        buf
190    }
191
192    /// Deserialize from 17-byte buffer (Task 1: validates magic)
193    pub fn from_bytes(buf: &[u8]) -> Result<Self> {
194        if buf.len() < Self::SIZE {
195            return Err(SochDBError::Corruption(format!(
196                "BlockHeader too short: {} < {}",
197                buf.len(),
198                Self::SIZE
199            )));
200        }
201
202        if buf[0..4] != Self::MAGIC {
203            return Err(SochDBError::Corruption(format!(
204                "Invalid block magic: expected {:?}, got {:?}",
205                Self::MAGIC,
206                &buf[0..4]
207            )));
208        }
209
210        Ok(Self {
211            magic: Self::MAGIC,
212            compression: buf[4],
213            original_size: LittleEndian::read_u32(&buf[5..9]),
214            compressed_size: LittleEndian::read_u32(&buf[9..13]),
215            checksum: LittleEndian::read_u32(&buf[13..17]),
216        })
217    }
218}
219
220/// File block storage backed by append-only store
221pub struct BlockStore {
222    /// Data storage (append-only)
223    data: RwLock<Vec<u8>>,
224    /// Next write offset
225    next_offset: AtomicU64,
226    /// Block index: offset -> BlockRef
227    index: RwLock<HashMap<u64, BlockRef>>,
228    /// Reference counts for GC
229    ref_counts: RwLock<HashMap<u64, u32>>,
230}
231
232impl BlockStore {
233    /// Create a new block store
234    pub fn new() -> Self {
235        Self {
236            data: RwLock::new(Vec::new()),
237            next_offset: AtomicU64::new(0),
238            index: RwLock::new(HashMap::new()),
239            ref_counts: RwLock::new(HashMap::new()),
240        }
241    }
242
243    /// Write a block with automatic compression selection
244    ///
245    /// Returns BlockRef for the written block.
246    pub fn write_block(&self, data: &[u8]) -> Result<BlockRef> {
247        let compression = self.select_compression(data);
248        self.write_block_with_compression(data, compression)
249    }
250
251    /// Write a block with specified compression
252    pub fn write_block_with_compression(
253        &self,
254        data: &[u8],
255        compression: BlockCompression,
256    ) -> Result<BlockRef> {
257        // Compress data
258        let compressed = self.compress(data, compression)?;
259
260        // Calculate checksum
261        let checksum = crc32fast::hash(&compressed);
262
263        // Allocate offset
264        let header_size = BlockHeader::SIZE;
265        let total_size = header_size + compressed.len();
266        let offset = self
267            .next_offset
268            .fetch_add(total_size as u64, Ordering::SeqCst);
269
270        // Create header
271        let header = BlockHeader {
272            magic: BlockHeader::MAGIC,
273            compression: compression as u8,
274            original_size: data.len() as u32,
275            compressed_size: compressed.len() as u32,
276            checksum,
277        };
278
279        // Write to store
280        {
281            let mut store = self.data.write();
282            store.resize((offset + total_size as u64) as usize, 0);
283
284            // Write header (Task 1: fixed-layout serialization)
285            let header_bytes = header.to_bytes();
286            store[offset as usize..offset as usize + header_size].copy_from_slice(&header_bytes);
287
288            // Write data
289            store[offset as usize + header_size..offset as usize + total_size]
290                .copy_from_slice(&compressed);
291        }
292
293        // Create block reference
294        let block_ref = BlockRef {
295            store_offset: offset,
296            compressed_len: compressed.len() as u32,
297            original_len: data.len() as u32,
298            compression,
299            checksum,
300        };
301
302        // Update index
303        self.index.write().insert(offset, block_ref.clone());
304
305        // Initialize ref count
306        self.ref_counts.write().insert(offset, 1);
307
308        Ok(block_ref)
309    }
310
311    /// Read a block
312    pub fn read_block(&self, block_ref: &BlockRef) -> Result<Vec<u8>> {
313        let offset = block_ref.store_offset as usize;
314        let header_size = BlockHeader::SIZE;
315        let total_size = header_size + block_ref.compressed_len as usize;
316
317        // Read from store
318        let compressed = {
319            let store = self.data.read();
320            if offset + total_size > store.len() {
321                return Err(SochDBError::NotFound("Block not found".into()));
322            }
323
324            // Verify header (Task 1: use fixed-layout deserialization)
325            let header_bytes = &store[offset..offset + header_size];
326            let _header = BlockHeader::from_bytes(header_bytes)?;
327
328            // Read data
329            store[offset + header_size..offset + total_size].to_vec()
330        };
331
332        // Verify checksum
333        let checksum = crc32fast::hash(&compressed);
334        if checksum != block_ref.checksum {
335            return Err(SochDBError::Corruption("Block checksum mismatch".into()));
336        }
337
338        // Decompress
339        self.decompress(
340            &compressed,
341            block_ref.compression,
342            block_ref.original_len as usize,
343        )
344    }
345
346    /// Select compression based on data content
347    fn select_compression(&self, data: &[u8]) -> BlockCompression {
348        if data.len() < 128 {
349            return BlockCompression::None; // Too small to compress
350        }
351
352        // Detect content type
353        if is_soch_content(data) {
354            BlockCompression::Zstd // TOON is repetitive, good for ZSTD
355        } else if is_json_content(data) || is_compressible(data) {
356            BlockCompression::Lz4 // JSON and compressible content: fast compression
357        } else {
358            BlockCompression::None // Binary/random data
359        }
360    }
361
362    /// Compress data with fallback to raw on compression failure or expansion
363    ///
364    /// Returns the compressed data or original data if:
365    /// - Compression fails
366    /// - Compressed size >= original size (compression expanded the data)
367    fn compress(&self, data: &[u8], compression: BlockCompression) -> Result<Vec<u8>> {
368        match compression {
369            BlockCompression::None => Ok(data.to_vec()),
370            BlockCompression::Lz4 => {
371                match lz4::block::compress(data, None, false) {
372                    Ok(compressed) => {
373                        // Fallback to raw if compression didn't help
374                        if compressed.len() >= data.len() {
375                            Ok(data.to_vec())
376                        } else {
377                            Ok(compressed)
378                        }
379                    }
380                    Err(_) => {
381                        // Fallback to raw on compression failure
382                        Ok(data.to_vec())
383                    }
384                }
385            }
386            BlockCompression::Zstd => {
387                // Default compression level (3) for balance of speed/ratio
388                match zstd::encode_all(data, 3) {
389                    Ok(compressed) => {
390                        // Fallback to raw if compression didn't help
391                        if compressed.len() >= data.len() {
392                            Ok(data.to_vec())
393                        } else {
394                            Ok(compressed)
395                        }
396                    }
397                    Err(_) => {
398                        // Fallback to raw on compression failure
399                        Ok(data.to_vec())
400                    }
401                }
402            }
403        }
404    }
405
406    /// Decompress data with automatic format detection
407    fn decompress(
408        &self,
409        data: &[u8],
410        compression: BlockCompression,
411        original_size: usize,
412    ) -> Result<Vec<u8>> {
413        match compression {
414            BlockCompression::None => Ok(data.to_vec()),
415            BlockCompression::Lz4 => {
416                // If data matches original size, it's uncompressed (fallback case)
417                if data.len() == original_size {
418                    return Ok(data.to_vec());
419                }
420
421                lz4::block::decompress(data, Some(original_size as i32)).map_err(|e| {
422                    SochDBError::Corruption(format!("LZ4 decompression failed: {}", e))
423                })
424            }
425            BlockCompression::Zstd => {
426                // If data matches original size, it's uncompressed (fallback case)
427                if data.len() == original_size {
428                    return Ok(data.to_vec());
429                }
430
431                zstd::decode_all(data).map_err(|e| {
432                    SochDBError::Corruption(format!("ZSTD decompression failed: {}", e))
433                })
434            }
435        }
436    }
437
438    /// Increment reference count
439    pub fn add_ref(&self, offset: u64) {
440        let mut refs = self.ref_counts.write();
441        *refs.entry(offset).or_insert(0) += 1;
442    }
443
444    /// Decrement reference count
445    pub fn release_ref(&self, offset: u64) -> bool {
446        let mut refs = self.ref_counts.write();
447        if let Some(count) = refs.get_mut(&offset) {
448            *count = count.saturating_sub(1);
449            return *count == 0;
450        }
451        false
452    }
453
454    /// Get storage statistics
455    pub fn stats(&self) -> BlockStoreStats {
456        let data = self.data.read();
457        let index = self.index.read();
458
459        let mut total_original = 0u64;
460        let mut total_compressed = 0u64;
461
462        for block_ref in index.values() {
463            total_original += block_ref.original_len as u64;
464            total_compressed += block_ref.compressed_len as u64;
465        }
466
467        BlockStoreStats {
468            total_bytes: data.len() as u64,
469            block_count: index.len(),
470            total_original_bytes: total_original,
471            total_compressed_bytes: total_compressed,
472            compression_ratio: if total_compressed > 0 {
473                total_original as f64 / total_compressed as f64
474            } else {
475                1.0
476            },
477        }
478    }
479}
480
481impl Default for BlockStore {
482    fn default() -> Self {
483        Self::new()
484    }
485}
486
487/// Block store statistics
488#[derive(Debug, Clone, Default)]
489pub struct BlockStoreStats {
490    /// Total bytes in store
491    pub total_bytes: u64,
492    /// Number of blocks
493    pub block_count: usize,
494    /// Total original bytes (before compression)
495    pub total_original_bytes: u64,
496    /// Total compressed bytes
497    pub total_compressed_bytes: u64,
498    /// Compression ratio (original / compressed)
499    pub compression_ratio: f64,
500}
501
502/// Check if data looks like TOON format
503pub fn is_soch_content(data: &[u8]) -> bool {
504    // TOON typically starts with table name and brackets
505    if data.len() < 10 {
506        return false;
507    }
508
509    // Check for common TOON patterns: "name[", "name{", or starts with alphabetic
510    let s = String::from_utf8_lossy(&data[..data.len().min(100)]);
511    s.contains('[') && s.contains('{') && s.contains(':')
512}
513
514/// Check if data looks like JSON
515pub fn is_json_content(data: &[u8]) -> bool {
516    if data.is_empty() {
517        return false;
518    }
519
520    // JSON typically starts with { or [
521    let first = data[0];
522    first == b'{' || first == b'['
523}
524
525/// Check if data is likely compressible
526pub fn is_compressible(data: &[u8]) -> bool {
527    if data.len() < 64 {
528        return false;
529    }
530
531    // Count unique bytes in sample
532    let sample_size = data.len().min(256);
533    let mut seen = [false; 256];
534    let mut unique = 0;
535
536    for &byte in &data[..sample_size] {
537        if !seen[byte as usize] {
538            seen[byte as usize] = true;
539            unique += 1;
540        }
541    }
542
543    // If less than 50% unique bytes, likely compressible
544    unique < sample_size / 2
545}
546
547/// File block manager - higher level interface for file I/O
548pub struct FileBlockManager {
549    /// Block store
550    store: BlockStore,
551    /// Block size
552    block_size: usize,
553}
554
555impl FileBlockManager {
556    /// Create new file block manager
557    pub fn new(block_size: usize) -> Self {
558        Self {
559            store: BlockStore::new(),
560            block_size: block_size.min(MAX_BLOCK_SIZE),
561        }
562    }
563
564    /// Write file data, returns block references
565    pub fn write_file(&self, data: &[u8]) -> Result<Vec<BlockRef>> {
566        let mut blocks = Vec::new();
567
568        for chunk in data.chunks(self.block_size) {
569            let block_ref = self.store.write_block(chunk)?;
570            blocks.push(block_ref);
571        }
572
573        Ok(blocks)
574    }
575
576    /// Read file data from block references
577    pub fn read_file(&self, blocks: &[BlockRef]) -> Result<Vec<u8>> {
578        let mut data = Vec::new();
579
580        for block_ref in blocks {
581            let block_data = self.store.read_block(block_ref)?;
582            data.extend(block_data);
583        }
584
585        Ok(data)
586    }
587
588    /// Get underlying store stats
589    pub fn stats(&self) -> BlockStoreStats {
590        self.store.stats()
591    }
592}
593
594impl Default for FileBlockManager {
595    fn default() -> Self {
596        Self::new(DEFAULT_BLOCK_SIZE)
597    }
598}
599
600// ============================================================================
601// Durable Block Store with WAL (Task 2: Persistent Block Storage)
602// ============================================================================
603
604use std::fs::{File, OpenOptions};
605use std::io::{self, BufReader, BufWriter, Read, Seek, SeekFrom, Write};
606use std::path::{Path, PathBuf};
607
608/// WAL record types
609#[derive(Debug, Clone, Copy, PartialEq, Eq)]
610#[repr(u8)]
611pub enum WalRecordType {
612    /// Block write record
613    BlockWrite = 1,
614    /// Checkpoint marker
615    Checkpoint = 2,
616    /// Commit marker
617    Commit = 3,
618    /// Transaction begin
619    TxnBegin = 4,
620}
621
622impl WalRecordType {
623    fn from_byte(b: u8) -> Option<Self> {
624        match b {
625            1 => Some(WalRecordType::BlockWrite),
626            2 => Some(WalRecordType::Checkpoint),
627            3 => Some(WalRecordType::Commit),
628            4 => Some(WalRecordType::TxnBegin),
629            _ => None,
630        }
631    }
632}
633
634/// WAL record header (fixed 33-byte layout)
635///
636/// Layout:
637/// ```text
638/// Offset  Size  Field
639/// 0       8     lsn (Log Sequence Number)
640/// 8       8     txn_id
641/// 16      1     record_type
642/// 17      8     page_id
643/// 25      4     data_len
644/// 29      4     crc32 (checksum of header + data)
645/// Total: 33 bytes
646/// ```
647#[derive(Debug, Clone)]
648pub struct WalRecordHeader {
649    pub lsn: u64,
650    pub txn_id: u64,
651    pub record_type: WalRecordType,
652    pub page_id: u64,
653    pub data_len: u32,
654    pub crc32: u32,
655}
656
657impl WalRecordHeader {
658    pub const SIZE: usize = 33;
659
660    /// Serialize to bytes
661    pub fn to_bytes(&self) -> [u8; Self::SIZE] {
662        let mut buf = [0u8; Self::SIZE];
663        LittleEndian::write_u64(&mut buf[0..8], self.lsn);
664        LittleEndian::write_u64(&mut buf[8..16], self.txn_id);
665        buf[16] = self.record_type as u8;
666        LittleEndian::write_u64(&mut buf[17..25], self.page_id);
667        LittleEndian::write_u32(&mut buf[25..29], self.data_len);
668        LittleEndian::write_u32(&mut buf[29..33], self.crc32);
669        buf
670    }
671
672    /// Deserialize from bytes
673    pub fn from_bytes(buf: &[u8]) -> Result<Self> {
674        if buf.len() < Self::SIZE {
675            return Err(SochDBError::Corruption(format!(
676                "WAL record header too short: {} < {}",
677                buf.len(),
678                Self::SIZE
679            )));
680        }
681
682        let record_type = WalRecordType::from_byte(buf[16]).ok_or_else(|| {
683            SochDBError::Corruption(format!("Invalid WAL record type: {}", buf[16]))
684        })?;
685
686        Ok(Self {
687            lsn: LittleEndian::read_u64(&buf[0..8]),
688            txn_id: LittleEndian::read_u64(&buf[8..16]),
689            record_type,
690            page_id: LittleEndian::read_u64(&buf[17..25]),
691            data_len: LittleEndian::read_u32(&buf[25..29]),
692            crc32: LittleEndian::read_u32(&buf[29..33]),
693        })
694    }
695
696    /// Compute CRC32 for header + data
697    pub fn compute_crc32(&self, data: &[u8]) -> u32 {
698        let mut hasher = crc32fast::Hasher::new();
699
700        // Hash header fields (excluding crc32 itself)
701        let mut header_buf = [0u8; 29];
702        LittleEndian::write_u64(&mut header_buf[0..8], self.lsn);
703        LittleEndian::write_u64(&mut header_buf[8..16], self.txn_id);
704        header_buf[16] = self.record_type as u8;
705        LittleEndian::write_u64(&mut header_buf[17..25], self.page_id);
706        LittleEndian::write_u32(&mut header_buf[25..29], self.data_len);
707
708        hasher.update(&header_buf);
709        hasher.update(data);
710        hasher.finalize()
711    }
712}
713
714/// WAL writer for durable writes
715pub struct WalWriter {
716    /// WAL file handle
717    file: BufWriter<File>,
718    /// Next LSN to assign
719    next_lsn: u64,
720    /// Path to WAL file
721    #[allow(dead_code)]
722    path: PathBuf,
723}
724
725impl WalWriter {
726    /// Open or create WAL file
727    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
728        let path = path.as_ref().to_path_buf();
729
730        // Open file for append
731        let file = OpenOptions::new()
732            .create(true)
733            .read(true)
734            .append(true)
735            .open(&path)?;
736
737        // Get current file size for next_lsn
738        let metadata = file.metadata()?;
739        let next_lsn = metadata.len();
740
741        Ok(Self {
742            file: BufWriter::new(file),
743            next_lsn,
744            path,
745        })
746    }
747
748    /// Append a WAL record
749    pub fn append(
750        &mut self,
751        txn_id: u64,
752        record_type: WalRecordType,
753        page_id: u64,
754        data: &[u8],
755    ) -> Result<u64> {
756        let lsn = self.next_lsn;
757
758        let mut header = WalRecordHeader {
759            lsn,
760            txn_id,
761            record_type,
762            page_id,
763            data_len: data.len() as u32,
764            crc32: 0, // Will be filled in
765        };
766
767        // Compute CRC32
768        header.crc32 = header.compute_crc32(data);
769
770        // Write header
771        let header_bytes = header.to_bytes();
772        self.file.write_all(&header_bytes)?;
773
774        // Write data
775        self.file.write_all(data)?;
776
777        // Update next LSN
778        self.next_lsn += WalRecordHeader::SIZE as u64 + data.len() as u64;
779
780        Ok(lsn)
781    }
782
783    /// Sync WAL to disk (fsync)
784    pub fn sync(&mut self) -> Result<()> {
785        self.file.flush()?;
786        self.file.get_ref().sync_all()?;
787        Ok(())
788    }
789
790    /// Get current LSN
791    pub fn current_lsn(&self) -> u64 {
792        self.next_lsn
793    }
794}
795
796/// WAL reader for recovery
797pub struct WalReader {
798    reader: BufReader<File>,
799    #[allow(dead_code)]
800    path: PathBuf,
801}
802
803impl WalReader {
804    /// Open WAL file for reading
805    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
806        let path = path.as_ref().to_path_buf();
807        let file = File::open(&path)?;
808
809        Ok(Self {
810            reader: BufReader::new(file),
811            path,
812        })
813    }
814
815    /// Read next WAL record
816    pub fn read_next(&mut self) -> Result<Option<(WalRecordHeader, Vec<u8>)>> {
817        // Read header
818        let mut header_buf = [0u8; WalRecordHeader::SIZE];
819        match self.reader.read_exact(&mut header_buf) {
820            Ok(()) => {}
821            Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
822            Err(e) => return Err(e.into()),
823        }
824
825        let header = WalRecordHeader::from_bytes(&header_buf)?;
826
827        // Read data
828        let mut data = vec![0u8; header.data_len as usize];
829        self.reader.read_exact(&mut data)?;
830
831        // Verify CRC32
832        let computed_crc = header.compute_crc32(&data);
833        if computed_crc != header.crc32 {
834            return Err(SochDBError::Corruption(format!(
835                "WAL CRC mismatch at LSN {}: expected {:#x}, got {:#x}",
836                header.lsn, header.crc32, computed_crc
837            )));
838        }
839
840        Ok(Some((header, data)))
841    }
842
843    /// Iterate over all records
844    pub fn iter(&mut self) -> WalIterator<'_> {
845        WalIterator { reader: self }
846    }
847}
848
849/// Iterator over WAL records
850pub struct WalIterator<'a> {
851    reader: &'a mut WalReader,
852}
853
854impl<'a> Iterator for WalIterator<'a> {
855    type Item = Result<(WalRecordHeader, Vec<u8>)>;
856
857    fn next(&mut self) -> Option<Self::Item> {
858        match self.reader.read_next() {
859            Ok(Some(record)) => Some(Ok(record)),
860            Ok(None) => None,
861            Err(e) => Some(Err(e)),
862        }
863    }
864}
865
866/// Durable block store with WAL-backed persistence
867///
868/// Provides ACID guarantees through write-ahead logging:
869/// - Writes are first logged to WAL, then synced
870/// - On crash, replays WAL to recover state
871/// - Checkpoint mechanism to truncate WAL
872pub struct DurableBlockStore {
873    /// In-memory block store (cache)
874    store: BlockStore,
875    /// WAL writer
876    wal: parking_lot::Mutex<WalWriter>,
877    /// Data file for persistent blocks
878    data_file: parking_lot::Mutex<File>,
879    /// Dirty pages not yet flushed to data file
880    dirty_pages: RwLock<HashMap<u64, Vec<u8>>>,
881    /// Checkpoint LSN (WAL can be truncated before this)
882    checkpoint_lsn: AtomicU64,
883    /// Path to data directory
884    data_dir: PathBuf,
885    /// Next page ID
886    next_page_id: AtomicU64,
887}
888
889impl DurableBlockStore {
890    /// Open or create a durable block store
891    pub fn open<P: AsRef<Path>>(data_dir: P) -> Result<Self> {
892        let data_dir = data_dir.as_ref().to_path_buf();
893
894        // Create directory if needed
895        std::fs::create_dir_all(&data_dir)?;
896
897        let wal_path = data_dir.join("wal.log");
898        let data_path = data_dir.join("blocks.dat");
899
900        // Open WAL
901        let wal = WalWriter::open(&wal_path)?;
902
903        // Open data file
904        let data_file = OpenOptions::new()
905            .create(true)
906            .read(true)
907            .write(true)
908            .truncate(false)
909            .open(&data_path)?;
910
911        let store = Self {
912            store: BlockStore::new(),
913            wal: parking_lot::Mutex::new(wal),
914            data_file: parking_lot::Mutex::new(data_file),
915            dirty_pages: RwLock::new(HashMap::new()),
916            checkpoint_lsn: AtomicU64::new(0),
917            data_dir,
918            next_page_id: AtomicU64::new(0),
919        };
920
921        Ok(store)
922    }
923
924    /// Write a block with WAL durability
925    ///
926    /// 1. Writes to WAL and fsyncs
927    /// 2. Updates in-memory store
928    /// 3. Marks page as dirty (will be flushed at checkpoint)
929    pub fn write_block(&self, txn_id: u64, data: &[u8]) -> Result<BlockRef> {
930        // Allocate page ID
931        let page_id = self.next_page_id.fetch_add(1, Ordering::SeqCst);
932
933        // Write to WAL first
934        {
935            let mut wal = self.wal.lock();
936            wal.append(txn_id, WalRecordType::BlockWrite, page_id, data)?;
937            wal.sync()?; // Durability point
938        }
939
940        // Write to in-memory store
941        let block_ref = self.store.write_block(data)?;
942
943        // Mark as dirty (for checkpoint flushing)
944        self.dirty_pages.write().insert(page_id, data.to_vec());
945
946        Ok(block_ref)
947    }
948
949    /// Read a block
950    pub fn read_block(&self, block_ref: &BlockRef) -> Result<Vec<u8>> {
951        self.store.read_block(block_ref)
952    }
953
954    /// Commit a transaction
955    pub fn commit(&self, txn_id: u64) -> Result<u64> {
956        let mut wal = self.wal.lock();
957        let lsn = wal.append(txn_id, WalRecordType::Commit, 0, &[])?;
958        wal.sync()?; // Durability point for commit
959        Ok(lsn)
960    }
961
962    /// Create a checkpoint
963    ///
964    /// 1. Flushes all dirty pages to data file
965    /// 2. Writes checkpoint marker to WAL
966    /// 3. Updates checkpoint LSN (WAL can be truncated before this)
967    pub fn checkpoint(&self) -> Result<u64> {
968        // Collect dirty pages
969        let dirty: Vec<(u64, Vec<u8>)> = {
970            let mut pages = self.dirty_pages.write();
971            pages.drain().collect()
972        };
973
974        // Flush to data file
975        {
976            let mut file = self.data_file.lock();
977            for (page_id, data) in &dirty {
978                let offset = *page_id * (DEFAULT_BLOCK_SIZE as u64 + BlockHeader::SIZE as u64);
979                file.seek(SeekFrom::Start(offset))?;
980                file.write_all(data)?;
981            }
982            file.sync_all()?;
983        }
984
985        // Write checkpoint marker
986        let lsn = {
987            let mut wal = self.wal.lock();
988            let lsn = wal.append(0, WalRecordType::Checkpoint, 0, &[])?;
989            wal.sync()?;
990            lsn
991        };
992
993        // Update checkpoint LSN
994        self.checkpoint_lsn.store(lsn, Ordering::SeqCst);
995
996        Ok(lsn)
997    }
998
999    /// Recover from WAL after crash
1000    ///
1001    /// Replays all committed transactions from the last checkpoint.
1002    pub fn recover(&mut self) -> Result<RecoveryStats> {
1003        let wal_path = self.data_dir.join("wal.log");
1004
1005        if !wal_path.exists() {
1006            return Ok(RecoveryStats::default());
1007        }
1008
1009        let mut reader = WalReader::open(&wal_path)?;
1010        let mut stats = RecoveryStats::default();
1011
1012        // Track active transactions
1013        let mut pending_txns: HashMap<u64, Vec<(u64, Vec<u8>)>> = HashMap::new();
1014        let mut committed_txns: std::collections::HashSet<u64> = std::collections::HashSet::new();
1015
1016        // Read all WAL records
1017        for record_result in reader.iter() {
1018            let (header, data) = record_result?;
1019            stats.records_read += 1;
1020
1021            match header.record_type {
1022                WalRecordType::BlockWrite => {
1023                    pending_txns
1024                        .entry(header.txn_id)
1025                        .or_default()
1026                        .push((header.page_id, data));
1027                }
1028                WalRecordType::Commit => {
1029                    committed_txns.insert(header.txn_id);
1030                    stats.txns_committed += 1;
1031                }
1032                WalRecordType::Checkpoint => {
1033                    self.checkpoint_lsn.store(header.lsn, Ordering::SeqCst);
1034                    stats.checkpoints_found += 1;
1035                }
1036                WalRecordType::TxnBegin => {
1037                    // Just track
1038                }
1039            }
1040        }
1041
1042        // Redo committed transactions
1043        for txn_id in &committed_txns {
1044            if let Some(writes) = pending_txns.remove(txn_id) {
1045                for (page_id, data) in writes {
1046                    self.store.write_block(&data)?;
1047                    self.next_page_id.fetch_max(page_id + 1, Ordering::SeqCst);
1048                    stats.blocks_recovered += 1;
1049                }
1050            }
1051        }
1052
1053        // Count aborted transactions (uncommitted)
1054        stats.txns_aborted = pending_txns.len();
1055
1056        Ok(stats)
1057    }
1058
1059    /// Get statistics
1060    pub fn stats(&self) -> DurableBlockStoreStats {
1061        let store_stats = self.store.stats();
1062        DurableBlockStoreStats {
1063            block_stats: store_stats,
1064            dirty_page_count: self.dirty_pages.read().len(),
1065            checkpoint_lsn: self.checkpoint_lsn.load(Ordering::SeqCst),
1066            wal_size: self.wal.lock().current_lsn(),
1067        }
1068    }
1069}
1070
1071/// Recovery statistics
1072#[derive(Debug, Clone, Default)]
1073pub struct RecoveryStats {
1074    /// Number of WAL records read
1075    pub records_read: usize,
1076    /// Number of committed transactions
1077    pub txns_committed: usize,
1078    /// Number of aborted transactions (not committed before crash)
1079    pub txns_aborted: usize,
1080    /// Number of blocks recovered
1081    pub blocks_recovered: usize,
1082    /// Number of checkpoints found
1083    pub checkpoints_found: usize,
1084}
1085
1086/// Durable block store statistics
1087#[derive(Debug, Clone)]
1088pub struct DurableBlockStoreStats {
1089    /// Block store stats
1090    pub block_stats: BlockStoreStats,
1091    /// Number of dirty pages (not yet flushed)
1092    pub dirty_page_count: usize,
1093    /// Last checkpoint LSN
1094    pub checkpoint_lsn: u64,
1095    /// Current WAL size (bytes)
1096    pub wal_size: u64,
1097}
1098
1099#[cfg(test)]
1100mod tests {
1101    use super::*;
1102
1103    #[test]
1104    fn test_block_store_write_read() {
1105        let store = BlockStore::new();
1106
1107        let data = b"Hello, SochFS block storage!";
1108        let block_ref = store.write_block(data).unwrap();
1109
1110        let read_data = store.read_block(&block_ref).unwrap();
1111        assert_eq!(read_data, data);
1112    }
1113
1114    #[test]
1115    fn test_compression_selection() {
1116        let store = BlockStore::new();
1117
1118        // Small data: no compression
1119        let small = b"hi";
1120        assert_eq!(store.select_compression(small), BlockCompression::None);
1121
1122        // TOON-like content (NOTE: compression stub returns None for now)
1123        let toon = b"users[5]{id,name}:\n1,Alice\n2,Bob\n3,Charlie";
1124        // In production with zstd, this would return Zstd
1125        let compression = store.select_compression(toon);
1126        assert!(compression == BlockCompression::Zstd || compression == BlockCompression::None);
1127
1128        // JSON content (NOTE: compression stub returns None for now)
1129        let json = br#"{"users": [{"id": 1, "name": "Alice"}]}"#;
1130        // In production with lz4, this would return Lz4
1131        let compression = store.select_compression(json);
1132        assert!(compression == BlockCompression::Lz4 || compression == BlockCompression::None);
1133    }
1134
1135    #[test]
1136    fn test_lz4_compression() {
1137        let store = BlockStore::new();
1138
1139        let data = "Hello, world! ".repeat(100);
1140        let block_ref = store
1141            .write_block_with_compression(data.as_bytes(), BlockCompression::Lz4)
1142            .unwrap();
1143
1144        // NOTE: With stub compression, compressed_len == original_len
1145        // In production with lz4, compressed_len < original_len
1146        // For now, just verify data integrity
1147        let read_data = store.read_block(&block_ref).unwrap();
1148        assert_eq!(read_data, data.as_bytes());
1149    }
1150
1151    #[test]
1152    fn test_zstd_compression() {
1153        let store = BlockStore::new();
1154
1155        let data = "TOON format is very repetitive ".repeat(100);
1156        let block_ref = store
1157            .write_block_with_compression(data.as_bytes(), BlockCompression::Zstd)
1158            .unwrap();
1159
1160        // NOTE: With stub compression, compressed_len == original_len
1161        // In production with zstd, compressed_len < original_len
1162        // For now, just verify data integrity
1163        let read_data = store.read_block(&block_ref).unwrap();
1164        assert_eq!(read_data, data.as_bytes());
1165    }
1166
1167    #[test]
1168    fn test_file_block_manager() {
1169        let manager = FileBlockManager::new(1024);
1170
1171        let data = "Test data ".repeat(500); // ~5KB, multiple blocks
1172        let blocks = manager.write_file(data.as_bytes()).unwrap();
1173
1174        assert!(blocks.len() > 1); // Should have multiple blocks
1175
1176        let read_data = manager.read_file(&blocks).unwrap();
1177        assert_eq!(read_data, data.as_bytes());
1178    }
1179
1180    #[test]
1181    fn test_stats() {
1182        let store = BlockStore::new();
1183
1184        // Write some compressible data
1185        let data = "Repetitive data pattern ".repeat(100);
1186        store.write_block(data.as_bytes()).unwrap();
1187
1188        let stats = store.stats();
1189        assert_eq!(stats.block_count, 1);
1190        // NOTE: With stub compression, ratio is 1.0
1191        // In production, ratio would be > 1.0 for compressible data
1192        assert!(stats.compression_ratio >= 1.0);
1193    }
1194
1195    // ========================================================================
1196    // Task 1: Fixed-Layout BlockHeader Tests
1197    // ========================================================================
1198
1199    #[test]
1200    fn test_block_header_fixed_layout() {
1201        let header = BlockHeader {
1202            magic: BlockHeader::MAGIC,
1203            compression: BlockCompression::Zstd as u8,
1204            original_size: 4096,
1205            compressed_size: 1024,
1206            checksum: 0xDEADBEEF,
1207        };
1208
1209        let bytes = header.to_bytes();
1210
1211        // Verify exact 17-byte size
1212        assert_eq!(bytes.len(), 17);
1213
1214        // Verify magic at offset 0-3
1215        assert_eq!(&bytes[0..4], b"TBLK");
1216
1217        // Verify compression at offset 4
1218        assert_eq!(bytes[4], BlockCompression::Zstd as u8);
1219
1220        // Verify original_size at offset 5-8 (little-endian)
1221        assert_eq!(LittleEndian::read_u32(&bytes[5..9]), 4096);
1222
1223        // Verify compressed_size at offset 9-12 (little-endian)
1224        assert_eq!(LittleEndian::read_u32(&bytes[9..13]), 1024);
1225
1226        // Verify checksum at offset 13-16 (little-endian)
1227        assert_eq!(LittleEndian::read_u32(&bytes[13..17]), 0xDEADBEEF);
1228    }
1229
1230    #[test]
1231    fn test_block_header_roundtrip() {
1232        let original = BlockHeader {
1233            magic: BlockHeader::MAGIC,
1234            compression: BlockCompression::Lz4 as u8,
1235            original_size: 65536,
1236            compressed_size: 32768,
1237            checksum: 0x12345678,
1238        };
1239
1240        let bytes = original.to_bytes();
1241        let recovered = BlockHeader::from_bytes(&bytes).unwrap();
1242
1243        assert_eq!(recovered.compression, original.compression);
1244        assert_eq!(recovered.original_size, original.original_size);
1245        assert_eq!(recovered.compressed_size, original.compressed_size);
1246        assert_eq!(recovered.checksum, original.checksum);
1247    }
1248
1249    #[test]
1250    fn test_block_header_invalid_magic() {
1251        let mut bytes = [0u8; 17];
1252        bytes[0..4].copy_from_slice(b"XXXX"); // Invalid magic
1253
1254        let result = BlockHeader::from_bytes(&bytes);
1255        assert!(result.is_err());
1256
1257        let err = result.unwrap_err();
1258        assert!(err.to_string().contains("Invalid block magic"));
1259    }
1260
1261    #[test]
1262    fn test_block_header_too_short() {
1263        let bytes = [0u8; 10]; // Only 10 bytes, need 17
1264
1265        let result = BlockHeader::from_bytes(&bytes);
1266        assert!(result.is_err());
1267    }
1268
1269    // ========================================================================
1270    // Task 5: BlockRef Error Propagation Tests
1271    // ========================================================================
1272
1273    #[test]
1274    fn test_block_ref_fixed_layout() {
1275        let block_ref = BlockRef {
1276            store_offset: 0x123456789ABCDEF0,
1277            compressed_len: 4096,
1278            original_len: 8192,
1279            compression: BlockCompression::Zstd,
1280            checksum: 0xCAFEBABE,
1281        };
1282
1283        let bytes = block_ref.to_bytes().unwrap();
1284
1285        // Verify exact 21-byte size
1286        assert_eq!(bytes.len(), 21);
1287
1288        // Verify offset at 0-7 (little-endian)
1289        assert_eq!(LittleEndian::read_u64(&bytes[0..8]), 0x123456789ABCDEF0);
1290
1291        // Verify compressed_len at 8-11
1292        assert_eq!(LittleEndian::read_u32(&bytes[8..12]), 4096);
1293
1294        // Verify original_len at 12-15
1295        assert_eq!(LittleEndian::read_u32(&bytes[12..16]), 8192);
1296
1297        // Verify compression at 16
1298        assert_eq!(bytes[16], BlockCompression::Zstd as u8);
1299
1300        // Verify checksum at 17-20
1301        assert_eq!(LittleEndian::read_u32(&bytes[17..21]), 0xCAFEBABE);
1302    }
1303
1304    #[test]
1305    fn test_block_ref_roundtrip() {
1306        let original = BlockRef {
1307            store_offset: u64::MAX, // Test large values
1308            compressed_len: u32::MAX,
1309            original_len: u32::MAX,
1310            compression: BlockCompression::None,
1311            checksum: u32::MAX,
1312        };
1313
1314        let bytes = original.to_bytes().unwrap();
1315        let recovered = BlockRef::from_bytes(&bytes).unwrap();
1316
1317        assert_eq!(recovered.store_offset, original.store_offset);
1318        assert_eq!(recovered.compressed_len, original.compressed_len);
1319        assert_eq!(recovered.original_len, original.original_len);
1320        assert_eq!(recovered.compression, original.compression);
1321        assert_eq!(recovered.checksum, original.checksum);
1322    }
1323
1324    #[test]
1325    fn test_block_ref_too_short() {
1326        let bytes = [0u8; 10]; // Only 10 bytes, need 21
1327
1328        let result = BlockRef::from_bytes(&bytes);
1329        assert!(result.is_err());
1330
1331        let err = result.unwrap_err();
1332        assert!(err.to_string().contains("BlockRef too short"));
1333    }
1334
1335    #[test]
1336    fn test_cross_platform_compatibility() {
1337        // Test that serialization is deterministic regardless of platform
1338        let block_ref = BlockRef {
1339            store_offset: 0x0102030405060708,
1340            compressed_len: 0x0A0B0C0D,
1341            original_len: 0x0E0F1011,
1342            compression: BlockCompression::Lz4,
1343            checksum: 0x12131415,
1344        };
1345
1346        let bytes = block_ref.to_bytes().unwrap();
1347
1348        // These exact byte values should be the same on any platform
1349        // (little-endian encoding)
1350        assert_eq!(bytes[0], 0x08); // LSB of offset
1351        assert_eq!(bytes[7], 0x01); // MSB of offset
1352        assert_eq!(bytes[8], 0x0D); // LSB of compressed_len
1353        assert_eq!(bytes[17], 0x15); // LSB of checksum
1354    }
1355
1356    // ========================================================================
1357    // Task 2: LZ4/ZSTD Production Compression Tests
1358    // ========================================================================
1359
1360    #[test]
1361    fn test_lz4_compression_roundtrip() {
1362        let store = BlockStore::new();
1363
1364        // Create compressible data (repetitive pattern)
1365        let data: Vec<u8> = (0..4096).map(|i| (i % 256) as u8).collect();
1366
1367        let block_ref = store
1368            .write_block_with_compression(&data, BlockCompression::Lz4)
1369            .unwrap();
1370        let recovered = store.read_block(&block_ref).unwrap();
1371
1372        assert_eq!(recovered, data);
1373        // Verify compression actually happened (data was compressible)
1374        assert!(block_ref.compressed_len <= block_ref.original_len);
1375    }
1376
1377    #[test]
1378    fn test_zstd_compression_roundtrip() {
1379        let store = BlockStore::new();
1380
1381        // Create highly compressible data (all zeros)
1382        let data = vec![0u8; 8192];
1383
1384        let block_ref = store
1385            .write_block_with_compression(&data, BlockCompression::Zstd)
1386            .unwrap();
1387        let recovered = store.read_block(&block_ref).unwrap();
1388
1389        assert_eq!(recovered, data);
1390        // Verify compression was very effective
1391        assert!(block_ref.compressed_len < block_ref.original_len / 2);
1392    }
1393
1394    #[test]
1395    fn test_compression_fallback_on_incompressible() {
1396        let store = BlockStore::new();
1397
1398        // Create random-looking data (incompressible)
1399        let mut data = vec![0u8; 256];
1400        for (i, byte) in data.iter_mut().enumerate().take(256) {
1401            *byte = ((i * 17 + 31) % 256) as u8; // Pseudo-random
1402        }
1403
1404        let block_ref = store
1405            .write_block_with_compression(&data, BlockCompression::Lz4)
1406            .unwrap();
1407        let recovered = store.read_block(&block_ref).unwrap();
1408
1409        assert_eq!(recovered, data);
1410    }
1411
1412    #[test]
1413    fn test_automatic_compression_selection() {
1414        let store = BlockStore::new();
1415
1416        // Test JSON content (should select LZ4)
1417        let json_data = br#"{"name": "test", "value": 123, "items": [1, 2, 3]}"#.repeat(10);
1418        let json_ref = store.write_block(&json_data).unwrap();
1419        let json_recovered = store.read_block(&json_ref).unwrap();
1420        assert_eq!(json_recovered, json_data);
1421
1422        // Test TOON content (should select ZSTD)
1423        let mut soch_data = vec![0u8; 256];
1424        soch_data[0..4].copy_from_slice(b"TOON"); // Magic header
1425        let soch_ref = store.write_block(&soch_data).unwrap();
1426        let soch_recovered = store.read_block(&soch_ref).unwrap();
1427        assert_eq!(soch_recovered, soch_data);
1428    }
1429
1430    #[test]
1431    fn test_small_data_no_compression() {
1432        let store = BlockStore::new();
1433
1434        // Data smaller than 128 bytes should not be compressed
1435        let small_data = vec![42u8; 64];
1436        let block_ref = store.write_block(&small_data).unwrap();
1437
1438        // Should be stored uncompressed
1439        assert_eq!(block_ref.compression, BlockCompression::None);
1440
1441        let recovered = store.read_block(&block_ref).unwrap();
1442        assert_eq!(recovered, small_data);
1443    }
1444
1445    #[test]
1446    fn test_compression_stats() {
1447        let store = BlockStore::new();
1448
1449        // Write compressible data
1450        let data = vec![0u8; 4096];
1451        store
1452            .write_block_with_compression(&data, BlockCompression::Zstd)
1453            .unwrap();
1454
1455        let stats = store.stats();
1456        assert_eq!(stats.block_count, 1);
1457        assert!(
1458            stats.compression_ratio > 1.0,
1459            "Compression should reduce size"
1460        );
1461        assert!(stats.total_original_bytes > stats.total_compressed_bytes);
1462    }
1463
1464    // ========================================================================
1465    // Task 2: Durable Block Store with WAL Tests
1466    // ========================================================================
1467
1468    #[test]
1469    fn test_wal_record_header_roundtrip() {
1470        let original = WalRecordHeader {
1471            lsn: 12345,
1472            txn_id: 67890,
1473            record_type: WalRecordType::BlockWrite,
1474            page_id: 42,
1475            data_len: 4096,
1476            crc32: 0xDEADBEEF,
1477        };
1478
1479        let bytes = original.to_bytes();
1480        let recovered = WalRecordHeader::from_bytes(&bytes).unwrap();
1481
1482        assert_eq!(recovered.lsn, original.lsn);
1483        assert_eq!(recovered.txn_id, original.txn_id);
1484        assert_eq!(recovered.record_type, original.record_type);
1485        assert_eq!(recovered.page_id, original.page_id);
1486        assert_eq!(recovered.data_len, original.data_len);
1487        assert_eq!(recovered.crc32, original.crc32);
1488    }
1489
1490    #[test]
1491    fn test_wal_crc32() {
1492        let header = WalRecordHeader {
1493            lsn: 100,
1494            txn_id: 1,
1495            record_type: WalRecordType::BlockWrite,
1496            page_id: 0,
1497            data_len: 4,
1498            crc32: 0,
1499        };
1500
1501        let data = b"test";
1502        let crc1 = header.compute_crc32(data);
1503        let crc2 = header.compute_crc32(data);
1504
1505        assert_eq!(crc1, crc2, "CRC should be deterministic");
1506
1507        // Different data should produce different CRC
1508        let different_data = b"TEST";
1509        let crc3 = header.compute_crc32(different_data);
1510        assert_ne!(crc1, crc3, "Different data should have different CRC");
1511    }
1512
1513    #[test]
1514    fn test_durable_block_store_basic() {
1515        let dir = tempfile::tempdir().unwrap();
1516
1517        let store = DurableBlockStore::open(dir.path()).unwrap();
1518
1519        // Write a block
1520        let data = b"Hello, durable block store!";
1521        let block_ref = store.write_block(1, data).unwrap();
1522
1523        // Read it back
1524        let read_data = store.read_block(&block_ref).unwrap();
1525        assert_eq!(read_data, data);
1526
1527        // Commit the transaction
1528        store.commit(1).unwrap();
1529
1530        // Check stats
1531        let stats = store.stats();
1532        assert_eq!(stats.dirty_page_count, 1);
1533    }
1534
1535    #[test]
1536    fn test_durable_block_store_checkpoint() {
1537        let dir = tempfile::tempdir().unwrap();
1538
1539        let store = DurableBlockStore::open(dir.path()).unwrap();
1540
1541        // Write some blocks
1542        store.write_block(1, b"block1").unwrap();
1543        store.write_block(1, b"block2").unwrap();
1544        store.write_block(1, b"block3").unwrap();
1545        store.commit(1).unwrap();
1546
1547        // Checkpoint should flush dirty pages
1548        let checkpoint_lsn = store.checkpoint().unwrap();
1549        assert!(checkpoint_lsn > 0);
1550
1551        // Dirty pages should be cleared
1552        let stats = store.stats();
1553        assert_eq!(stats.dirty_page_count, 0);
1554        assert_eq!(stats.checkpoint_lsn, checkpoint_lsn);
1555    }
1556
1557    #[test]
1558    fn test_durable_block_store_recovery() {
1559        let dir = tempfile::tempdir().unwrap();
1560
1561        // Phase 1: Write and commit
1562        {
1563            let store = DurableBlockStore::open(dir.path()).unwrap();
1564            store.write_block(1, b"data1").unwrap();
1565            store.write_block(1, b"data2").unwrap();
1566            store.commit(1).unwrap();
1567
1568            // Don't checkpoint - simulate crash before flush
1569        }
1570
1571        // Phase 2: Recover
1572        {
1573            let mut store = DurableBlockStore::open(dir.path()).unwrap();
1574            let stats = store.recover().unwrap();
1575
1576            // Should have recovered the committed transaction
1577            assert_eq!(stats.txns_committed, 1);
1578            assert_eq!(stats.blocks_recovered, 2);
1579            assert_eq!(stats.txns_aborted, 0);
1580        }
1581    }
1582
1583    #[test]
1584    fn test_durable_block_store_uncommitted_recovery() {
1585        let dir = tempfile::tempdir().unwrap();
1586
1587        // Phase 1: Write but don't commit (simulate crash)
1588        {
1589            let store = DurableBlockStore::open(dir.path()).unwrap();
1590            store.write_block(1, b"uncommitted_data").unwrap();
1591            // NO commit - transaction should be aborted on recovery
1592        }
1593
1594        // Phase 2: Recover
1595        {
1596            let mut store = DurableBlockStore::open(dir.path()).unwrap();
1597            let stats = store.recover().unwrap();
1598
1599            // Uncommitted transaction should be aborted
1600            assert_eq!(stats.txns_committed, 0);
1601            assert_eq!(stats.txns_aborted, 1);
1602            assert_eq!(stats.blocks_recovered, 0);
1603        }
1604    }
1605
1606    #[test]
1607    fn test_wal_writer_reader_roundtrip() {
1608        let dir = tempfile::tempdir().unwrap();
1609        let wal_path = dir.path().join("test.wal");
1610
1611        // Write records
1612        {
1613            let mut writer = WalWriter::open(&wal_path).unwrap();
1614            writer.append(1, WalRecordType::TxnBegin, 0, &[]).unwrap();
1615            writer
1616                .append(1, WalRecordType::BlockWrite, 0, b"data1")
1617                .unwrap();
1618            writer
1619                .append(1, WalRecordType::BlockWrite, 1, b"data2")
1620                .unwrap();
1621            writer.append(1, WalRecordType::Commit, 0, &[]).unwrap();
1622            writer.sync().unwrap();
1623        }
1624
1625        // Read records
1626        {
1627            let mut reader = WalReader::open(&wal_path).unwrap();
1628            let mut records = Vec::new();
1629            for record in reader.iter() {
1630                records.push(record.unwrap());
1631            }
1632
1633            assert_eq!(records.len(), 4);
1634            assert_eq!(records[0].0.record_type, WalRecordType::TxnBegin);
1635            assert_eq!(records[1].0.record_type, WalRecordType::BlockWrite);
1636            assert_eq!(records[1].1, b"data1");
1637            assert_eq!(records[2].0.record_type, WalRecordType::BlockWrite);
1638            assert_eq!(records[2].1, b"data2");
1639            assert_eq!(records[3].0.record_type, WalRecordType::Commit);
1640        }
1641    }
1642}