Skip to main content

reddb_server/storage/wal/
record.rs

1use crate::storage::engine::crc32::{crc32, crc32_update};
2use std::io::{self, Read};
3
4/// WAL file magic bytes (RDBW)
5pub const WAL_MAGIC: &[u8; 4] = b"RDBW";
6
7/// WAL file format version
8pub const WAL_VERSION: u8 = 2;
9
10/// Minimum payload size (bytes) to attempt zstd compression.
11/// Smaller records pay more overhead than benefit from compression.
12const COMPRESS_THRESHOLD: usize = 256;
13
14/// Compression algorithm tag embedded in `PageWriteCompressed` records.
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16#[repr(u8)]
17pub enum Compression {
18    None = 0,
19    Zstd = 1,
20}
21
22impl Compression {
23    fn from_u8(v: u8) -> Option<Self> {
24        match v {
25            0 => Some(Compression::None),
26            1 => Some(Compression::Zstd),
27            _ => None,
28        }
29    }
30}
31
32/// Type of WAL record
33#[derive(Debug, Clone, Copy, PartialEq, Eq)]
34#[repr(u8)]
35pub enum RecordType {
36    Begin = 1,
37    Commit = 2,
38    Rollback = 3,
39    /// Legacy uncompressed page write (v1 format — still written for
40    /// small payloads to avoid compression overhead).
41    PageWrite = 4,
42    Checkpoint = 5,
43    /// Compressed page write (v2 format).
44    ///
45    /// Layout (after the type byte):
46    /// ```text
47    /// [TxID: 8][PageID: 4][Compression: 1][OrigLen: 4][DataLen: 4][Data: N][CRC: 4]
48    /// ```
49    /// `OrigLen` is the original (uncompressed) size; needed to pre-allocate
50    /// the decompression buffer.
51    PageWriteCompressed = 6,
52    /// Logical autocommit transaction commit batch (v2 format).
53    ///
54    /// Layout (after the type byte):
55    /// ```text
56    /// [TxID: 8][ActionCount: 4][[DataLen: 4][Data: N]...][CRC: 4]
57    /// ```
58    TxCommitBatch = 7,
59    /// Full-page image (FPI). Captures a complete page before its first
60    /// modification within a checkpoint cycle so torn-page recovery can
61    /// replay the pristine image before redo replays subsequent
62    /// `PageWrite`s. Enables `fold_dwb_into_wal` to retire the `-dwb`
63    /// sidecar (gh-478).
64    ///
65    /// Layout (after the type byte):
66    /// ```text
67    /// [TxID: 8][PageID: 4][CkptEpoch: 8][DataLen: 4][Data: N][CRC: 4]
68    /// ```
69    FullPageImage = 8,
70}
71
72impl RecordType {
73    pub fn from_u8(v: u8) -> Option<Self> {
74        match v {
75            1 => Some(RecordType::Begin),
76            2 => Some(RecordType::Commit),
77            3 => Some(RecordType::Rollback),
78            4 => Some(RecordType::PageWrite),
79            5 => Some(RecordType::Checkpoint),
80            6 => Some(RecordType::PageWriteCompressed),
81            7 => Some(RecordType::TxCommitBatch),
82            8 => Some(RecordType::FullPageImage),
83            _ => None,
84        }
85    }
86}
87
88/// A single entry in the write-ahead log
89#[derive(Debug, Clone, PartialEq)]
90pub enum WalRecord {
91    /// Start of a transaction
92    Begin { tx_id: u64 },
93    /// Commit of a transaction
94    Commit { tx_id: u64 },
95    /// Rollback of a transaction
96    Rollback { tx_id: u64 },
97    /// Write of a page — always carries uncompressed data (transparent to
98    /// callers: `read()` decompresses on-the-fly).
99    PageWrite {
100        tx_id: u64,
101        page_id: u32,
102        data: Vec<u8>,
103    },
104    /// Atomic logical commit batch. Recovery applies all actions in
105    /// order iff this complete record and checksum are present.
106    TxCommitBatch { tx_id: u64, actions: Vec<Vec<u8>> },
107    /// Full-page image — pristine page bytes captured before the first
108    /// modification per checkpoint cycle. Recovery applies these before
109    /// redo so torn writes are healed without the `-dwb` sidecar.
110    FullPageImage {
111        tx_id: u64,
112        page_id: u32,
113        ckpt_epoch: u64,
114        data: Vec<u8>,
115    },
116    /// Checkpoint marker (indicates up to which LSN pages are flushed)
117    Checkpoint { lsn: u64 },
118}
119
120impl WalRecord {
121    /// Serialize record to bytes (including checksum).
122    ///
123    /// `PageWrite` records whose payload is ≥ `COMPRESS_THRESHOLD` bytes are
124    /// compressed with zstd level 3 and emitted as `PageWriteCompressed`.
125    /// Smaller payloads use the plain `PageWrite` encoding (no overhead).
126    pub fn encode(&self) -> Vec<u8> {
127        let mut buf = Vec::new();
128
129        // Layout (non-PageWrite):
130        // [Type: 1]
131        // [TxID/LSN: 8]
132        // [Checksum: 4]
133        //
134        // PageWrite (uncompressed):
135        // [Type: 1][TxID: 8][PageID: 4][DataLen: 4][Data: N][CRC: 4]
136        //
137        // PageWriteCompressed:
138        // [Type: 1][TxID: 8][PageID: 4][Compression: 1][OrigLen: 4][DataLen: 4][Data: N][CRC: 4]
139        //
140        // TxCommitBatch:
141        // [Type: 1][TxID: 8][ActionCount: 4][[DataLen: 4][Data: N]...][CRC: 4]
142
143        match self {
144            WalRecord::Begin { tx_id } => {
145                buf.push(RecordType::Begin as u8);
146                buf.extend_from_slice(&tx_id.to_le_bytes());
147            }
148            WalRecord::Commit { tx_id } => {
149                buf.push(RecordType::Commit as u8);
150                buf.extend_from_slice(&tx_id.to_le_bytes());
151            }
152            WalRecord::Rollback { tx_id } => {
153                buf.push(RecordType::Rollback as u8);
154                buf.extend_from_slice(&tx_id.to_le_bytes());
155            }
156            WalRecord::PageWrite {
157                tx_id,
158                page_id,
159                data,
160            } => {
161                if data.len() >= COMPRESS_THRESHOLD {
162                    // Try zstd compression; fall back to uncompressed if it expands.
163                    if let Ok(compressed) =
164                        zstd::bulk::compress(data.as_slice(), /* level */ 3)
165                    {
166                        if compressed.len() < data.len() {
167                            // Compressed is smaller — use compressed format.
168                            buf.push(RecordType::PageWriteCompressed as u8);
169                            buf.extend_from_slice(&tx_id.to_le_bytes());
170                            buf.extend_from_slice(&page_id.to_le_bytes());
171                            buf.push(Compression::Zstd as u8);
172                            buf.extend_from_slice(&(data.len() as u32).to_le_bytes()); // orig_len
173                            buf.extend_from_slice(&(compressed.len() as u32).to_le_bytes());
174                            buf.extend_from_slice(&compressed);
175                            let checksum = crc32(&buf);
176                            buf.extend_from_slice(&checksum.to_le_bytes());
177                            return buf;
178                        }
179                    }
180                }
181                // Uncompressed path (small payload or compression expanded).
182                buf.push(RecordType::PageWrite as u8);
183                buf.extend_from_slice(&tx_id.to_le_bytes());
184                buf.extend_from_slice(&page_id.to_le_bytes());
185                buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
186                buf.extend_from_slice(data);
187            }
188            WalRecord::TxCommitBatch { tx_id, actions } => {
189                buf.push(RecordType::TxCommitBatch as u8);
190                buf.extend_from_slice(&tx_id.to_le_bytes());
191                buf.extend_from_slice(&(actions.len() as u32).to_le_bytes());
192                for action in actions {
193                    buf.extend_from_slice(&(action.len() as u32).to_le_bytes());
194                    buf.extend_from_slice(action);
195                }
196            }
197            WalRecord::FullPageImage {
198                tx_id,
199                page_id,
200                ckpt_epoch,
201                data,
202            } => {
203                buf.push(RecordType::FullPageImage as u8);
204                buf.extend_from_slice(&tx_id.to_le_bytes());
205                buf.extend_from_slice(&page_id.to_le_bytes());
206                buf.extend_from_slice(&ckpt_epoch.to_le_bytes());
207                buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
208                buf.extend_from_slice(data);
209            }
210            WalRecord::Checkpoint { lsn } => {
211                buf.push(RecordType::Checkpoint as u8);
212                buf.extend_from_slice(&lsn.to_le_bytes());
213            }
214        }
215
216        // Calculate and append checksum
217        let checksum = crc32(&buf);
218        buf.extend_from_slice(&checksum.to_le_bytes());
219
220        buf
221    }
222
223    /// Read a record from a reader.
224    ///
225    /// Handles both v1 (`PageWrite`) and v2 (`PageWriteCompressed`) record
226    /// formats transparently — callers always receive uncompressed data.
227    pub fn read<R: Read>(reader: &mut R) -> io::Result<Option<WalRecord>> {
228        // Read type byte
229        let mut type_buf = [0u8; 1];
230        match reader.read_exact(&mut type_buf) {
231            Ok(_) => (),
232            Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
233            Err(e) => return Err(e),
234        };
235
236        let record_type = RecordType::from_u8(type_buf[0])
237            .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "Invalid record type"))?;
238
239        // Start checksum calculation
240        let mut running_crc = crc32_update(0, &type_buf);
241
242        let record = match record_type {
243            RecordType::Begin | RecordType::Commit | RecordType::Rollback => {
244                let mut buf = [0u8; 8];
245                reader.read_exact(&mut buf)?;
246                running_crc = crc32_update(running_crc, &buf);
247                let tx_id = u64::from_le_bytes(buf);
248
249                match record_type {
250                    RecordType::Begin => WalRecord::Begin { tx_id },
251                    RecordType::Commit => WalRecord::Commit { tx_id },
252                    RecordType::Rollback => WalRecord::Rollback { tx_id },
253                    _ => unreachable!(),
254                }
255            }
256            RecordType::PageWrite => {
257                // Read TxID
258                let mut tx_buf = [0u8; 8];
259                reader.read_exact(&mut tx_buf)?;
260                running_crc = crc32_update(running_crc, &tx_buf);
261                let tx_id = u64::from_le_bytes(tx_buf);
262
263                // Read PageID
264                let mut page_buf = [0u8; 4];
265                reader.read_exact(&mut page_buf)?;
266                running_crc = crc32_update(running_crc, &page_buf);
267                let page_id = u32::from_le_bytes(page_buf);
268
269                // Read Length
270                let mut len_buf = [0u8; 4];
271                reader.read_exact(&mut len_buf)?;
272                running_crc = crc32_update(running_crc, &len_buf);
273                let len = u32::from_le_bytes(len_buf) as usize;
274
275                // Read Data
276                let mut data = vec![0u8; len];
277                reader.read_exact(&mut data)?;
278                running_crc = crc32_update(running_crc, &data);
279
280                WalRecord::PageWrite {
281                    tx_id,
282                    page_id,
283                    data,
284                }
285            }
286            RecordType::PageWriteCompressed => {
287                // Read TxID
288                let mut tx_buf = [0u8; 8];
289                reader.read_exact(&mut tx_buf)?;
290                running_crc = crc32_update(running_crc, &tx_buf);
291                let tx_id = u64::from_le_bytes(tx_buf);
292
293                // Read PageID
294                let mut page_buf = [0u8; 4];
295                reader.read_exact(&mut page_buf)?;
296                running_crc = crc32_update(running_crc, &page_buf);
297                let page_id = u32::from_le_bytes(page_buf);
298
299                // Read Compression algorithm byte
300                let mut comp_buf = [0u8; 1];
301                reader.read_exact(&mut comp_buf)?;
302                running_crc = crc32_update(running_crc, &comp_buf);
303                let compression = Compression::from_u8(comp_buf[0]).ok_or_else(|| {
304                    io::Error::new(
305                        io::ErrorKind::InvalidData,
306                        format!("Unknown WAL compression algorithm: {}", comp_buf[0]),
307                    )
308                })?;
309
310                // Read original (uncompressed) length — used to pre-allocate decompression buffer
311                let mut orig_len_buf = [0u8; 4];
312                reader.read_exact(&mut orig_len_buf)?;
313                running_crc = crc32_update(running_crc, &orig_len_buf);
314                let orig_len = u32::from_le_bytes(orig_len_buf) as usize;
315
316                // Read compressed data length
317                let mut len_buf = [0u8; 4];
318                reader.read_exact(&mut len_buf)?;
319                running_crc = crc32_update(running_crc, &len_buf);
320                let len = u32::from_le_bytes(len_buf) as usize;
321
322                // Read compressed data
323                let mut compressed = vec![0u8; len];
324                reader.read_exact(&mut compressed)?;
325                running_crc = crc32_update(running_crc, &compressed);
326
327                // Decompress
328                let data = match compression {
329                    Compression::Zstd => {
330                        let mut out = vec![0u8; orig_len];
331                        zstd::bulk::decompress_to_buffer(&compressed, &mut out).map_err(|e| {
332                            io::Error::new(
333                                io::ErrorKind::InvalidData,
334                                format!("WAL zstd decompress failed: {e}"),
335                            )
336                        })?;
337                        out
338                    }
339                    Compression::None => compressed,
340                };
341
342                WalRecord::PageWrite {
343                    tx_id,
344                    page_id,
345                    data,
346                }
347            }
348            RecordType::TxCommitBatch => {
349                let mut tx_buf = [0u8; 8];
350                reader.read_exact(&mut tx_buf)?;
351                running_crc = crc32_update(running_crc, &tx_buf);
352                let tx_id = u64::from_le_bytes(tx_buf);
353
354                let mut count_buf = [0u8; 4];
355                reader.read_exact(&mut count_buf)?;
356                running_crc = crc32_update(running_crc, &count_buf);
357                let count = u32::from_le_bytes(count_buf) as usize;
358
359                let mut actions = Vec::with_capacity(count);
360                for _ in 0..count {
361                    let mut len_buf = [0u8; 4];
362                    reader.read_exact(&mut len_buf)?;
363                    running_crc = crc32_update(running_crc, &len_buf);
364                    let len = u32::from_le_bytes(len_buf) as usize;
365
366                    let mut action = vec![0u8; len];
367                    reader.read_exact(&mut action)?;
368                    running_crc = crc32_update(running_crc, &action);
369                    actions.push(action);
370                }
371
372                WalRecord::TxCommitBatch { tx_id, actions }
373            }
374            RecordType::FullPageImage => {
375                let mut tx_buf = [0u8; 8];
376                reader.read_exact(&mut tx_buf)?;
377                running_crc = crc32_update(running_crc, &tx_buf);
378                let tx_id = u64::from_le_bytes(tx_buf);
379
380                let mut page_buf = [0u8; 4];
381                reader.read_exact(&mut page_buf)?;
382                running_crc = crc32_update(running_crc, &page_buf);
383                let page_id = u32::from_le_bytes(page_buf);
384
385                let mut epoch_buf = [0u8; 8];
386                reader.read_exact(&mut epoch_buf)?;
387                running_crc = crc32_update(running_crc, &epoch_buf);
388                let ckpt_epoch = u64::from_le_bytes(epoch_buf);
389
390                let mut len_buf = [0u8; 4];
391                reader.read_exact(&mut len_buf)?;
392                running_crc = crc32_update(running_crc, &len_buf);
393                let len = u32::from_le_bytes(len_buf) as usize;
394
395                let mut data = vec![0u8; len];
396                reader.read_exact(&mut data)?;
397                running_crc = crc32_update(running_crc, &data);
398
399                WalRecord::FullPageImage {
400                    tx_id,
401                    page_id,
402                    ckpt_epoch,
403                    data,
404                }
405            }
406            RecordType::Checkpoint => {
407                let mut buf = [0u8; 8];
408                reader.read_exact(&mut buf)?;
409                running_crc = crc32_update(running_crc, &buf);
410                let lsn = u64::from_le_bytes(buf);
411                WalRecord::Checkpoint { lsn }
412            }
413        };
414
415        // Verify checksum
416        let mut crc_buf = [0u8; 4];
417        reader.read_exact(&mut crc_buf)?;
418        let stored_crc = u32::from_le_bytes(crc_buf);
419
420        if running_crc != stored_crc {
421            return Err(io::Error::new(
422                io::ErrorKind::InvalidData,
423                "WAL record checksum mismatch",
424            ));
425        }
426
427        Ok(Some(record))
428    }
429}
430
431#[cfg(test)]
432mod tests {
433    use super::*;
434    use std::io::Cursor;
435
436    // ==================== RecordType Tests ====================
437
438    #[test]
439    fn test_record_type_from_u8() {
440        assert_eq!(RecordType::from_u8(1), Some(RecordType::Begin));
441        assert_eq!(RecordType::from_u8(2), Some(RecordType::Commit));
442        assert_eq!(RecordType::from_u8(3), Some(RecordType::Rollback));
443        assert_eq!(RecordType::from_u8(4), Some(RecordType::PageWrite));
444        assert_eq!(RecordType::from_u8(5), Some(RecordType::Checkpoint));
445        assert_eq!(
446            RecordType::from_u8(6),
447            Some(RecordType::PageWriteCompressed)
448        );
449        assert_eq!(RecordType::from_u8(7), Some(RecordType::TxCommitBatch));
450        assert_eq!(RecordType::from_u8(8), Some(RecordType::FullPageImage));
451    }
452
453    #[test]
454    fn test_record_type_invalid() {
455        assert_eq!(RecordType::from_u8(0), None);
456        assert_eq!(RecordType::from_u8(9), None);
457        assert_eq!(RecordType::from_u8(255), None);
458    }
459
460    // ==================== WalRecord::encode Tests ====================
461
462    #[test]
463    fn test_encode_begin() {
464        let record = WalRecord::Begin { tx_id: 12345 };
465        let encoded = record.encode();
466
467        // Type (1) + TxID (8) + Checksum (4) = 13 bytes
468        assert_eq!(encoded.len(), 13);
469        assert_eq!(encoded[0], RecordType::Begin as u8);
470    }
471
472    #[test]
473    fn test_encode_commit() {
474        let record = WalRecord::Commit { tx_id: 99999 };
475        let encoded = record.encode();
476
477        assert_eq!(encoded.len(), 13);
478        assert_eq!(encoded[0], RecordType::Commit as u8);
479    }
480
481    #[test]
482    fn test_encode_rollback() {
483        let record = WalRecord::Rollback { tx_id: 54321 };
484        let encoded = record.encode();
485
486        assert_eq!(encoded.len(), 13);
487        assert_eq!(encoded[0], RecordType::Rollback as u8);
488    }
489
490    #[test]
491    fn test_encode_checkpoint() {
492        let record = WalRecord::Checkpoint { lsn: 1000000 };
493        let encoded = record.encode();
494
495        assert_eq!(encoded.len(), 13);
496        assert_eq!(encoded[0], RecordType::Checkpoint as u8);
497    }
498
499    #[test]
500    fn test_encode_page_write_small() {
501        // Small data (< COMPRESS_THRESHOLD) stays uncompressed.
502        let data = vec![1, 2, 3, 4, 5];
503        let record = WalRecord::PageWrite {
504            tx_id: 100,
505            page_id: 42,
506            data: data.clone(),
507        };
508        let encoded = record.encode();
509
510        // Type (1) + TxID (8) + PageID (4) + Len (4) + Data (5) + Checksum (4) = 26 bytes
511        assert_eq!(encoded.len(), 26);
512        assert_eq!(encoded[0], RecordType::PageWrite as u8);
513    }
514
515    #[test]
516    fn test_encode_page_write_empty_data() {
517        let record = WalRecord::PageWrite {
518            tx_id: 1,
519            page_id: 0,
520            data: vec![],
521        };
522        let encoded = record.encode();
523
524        // Type (1) + TxID (8) + PageID (4) + Len (4) + Checksum (4) = 21 bytes
525        assert_eq!(encoded.len(), 21);
526    }
527
528    #[test]
529    fn test_encode_tx_commit_batch() {
530        let record = WalRecord::TxCommitBatch {
531            tx_id: 7,
532            actions: vec![b"insert".to_vec(), b"update".to_vec()],
533        };
534        let encoded = record.encode();
535
536        assert_eq!(encoded[0], RecordType::TxCommitBatch as u8);
537    }
538
539    // ==================== WalRecord::read Tests ====================
540
541    #[test]
542    fn test_read_begin_roundtrip() {
543        let original = WalRecord::Begin { tx_id: 42 };
544        let encoded = original.encode();
545
546        let mut cursor = Cursor::new(encoded);
547        let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
548
549        assert_eq!(decoded, original);
550    }
551
552    #[test]
553    fn test_read_commit_roundtrip() {
554        let original = WalRecord::Commit { tx_id: 999 };
555        let encoded = original.encode();
556
557        let mut cursor = Cursor::new(encoded);
558        let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
559
560        assert_eq!(decoded, original);
561    }
562
563    #[test]
564    fn test_read_rollback_roundtrip() {
565        let original = WalRecord::Rollback { tx_id: 777 };
566        let encoded = original.encode();
567
568        let mut cursor = Cursor::new(encoded);
569        let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
570
571        assert_eq!(decoded, original);
572    }
573
574    #[test]
575    fn test_read_checkpoint_roundtrip() {
576        let original = WalRecord::Checkpoint { lsn: 123456789 };
577        let encoded = original.encode();
578
579        let mut cursor = Cursor::new(encoded);
580        let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
581
582        assert_eq!(decoded, original);
583    }
584
585    #[test]
586    fn test_read_page_write_roundtrip() {
587        let original = WalRecord::PageWrite {
588            tx_id: 50,
589            page_id: 100,
590            data: vec![10, 20, 30, 40, 50, 60, 70, 80],
591        };
592        let encoded = original.encode();
593
594        let mut cursor = Cursor::new(encoded);
595        let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
596
597        assert_eq!(decoded, original);
598    }
599
600    #[test]
601    fn test_read_tx_commit_batch_roundtrip() {
602        let original = WalRecord::TxCommitBatch {
603            tx_id: 42,
604            actions: vec![b"old-version".to_vec(), b"new-version".to_vec()],
605        };
606        let encoded = original.encode();
607
608        let mut cursor = Cursor::new(encoded);
609        let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
610
611        assert_eq!(decoded, original);
612    }
613
614    #[test]
615    fn test_read_page_write_large_data() {
616        // Large enough to trigger compression.
617        let data: Vec<u8> = (0..4096).map(|i| (i % 256) as u8).collect();
618        let original = WalRecord::PageWrite {
619            tx_id: 1,
620            page_id: 0,
621            data,
622        };
623        let encoded = original.encode();
624
625        let mut cursor = Cursor::new(encoded);
626        let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
627
628        // Round-trip: decoded data matches original (even if encoded differently).
629        assert_eq!(decoded, original);
630    }
631
632    #[test]
633    fn page_write_compressed_roundtrip() {
634        // Highly compressible payload: 1 KiB of repeated bytes.
635        let data = vec![0xABu8; 1024];
636        let record = WalRecord::PageWrite {
637            tx_id: 7,
638            page_id: 3,
639            data: data.clone(),
640        };
641        let encoded = record.encode();
642
643        // Should be stored as PageWriteCompressed (compressible > threshold).
644        assert_eq!(encoded[0], RecordType::PageWriteCompressed as u8);
645
646        // And round-trip decoding recovers original.
647        let mut cursor = Cursor::new(encoded);
648        let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
649        assert_eq!(
650            decoded,
651            WalRecord::PageWrite {
652                tx_id: 7,
653                page_id: 3,
654                data
655            }
656        );
657    }
658
659    #[test]
660    fn full_page_image_roundtrip() {
661        let data: Vec<u8> = (0..4096).map(|i| (i % 251) as u8).collect();
662        let original = WalRecord::FullPageImage {
663            tx_id: 11,
664            page_id: 9,
665            ckpt_epoch: 42,
666            data: data.clone(),
667        };
668        let encoded = original.encode();
669        assert_eq!(encoded[0], RecordType::FullPageImage as u8);
670
671        let mut cursor = Cursor::new(encoded);
672        let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
673        assert_eq!(decoded, original);
674    }
675
676    #[test]
677    fn full_page_image_checksum_mismatch_detected() {
678        let original = WalRecord::FullPageImage {
679            tx_id: 1,
680            page_id: 2,
681            ckpt_epoch: 3,
682            data: vec![0xAA; 32],
683        };
684        let mut encoded = original.encode();
685        let mid = encoded.len() / 2;
686        encoded[mid] ^= 0xFF;
687        let mut cursor = Cursor::new(encoded);
688        assert!(WalRecord::read(&mut cursor).is_err());
689    }
690
691    #[test]
692    fn test_read_eof() {
693        let mut cursor = Cursor::new(Vec::<u8>::new());
694        let result = WalRecord::read(&mut cursor).unwrap();
695        assert!(result.is_none());
696    }
697
698    #[test]
699    fn test_read_invalid_record_type() {
700        let buf = vec![99, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]; // Invalid type 99
701        let mut cursor = Cursor::new(buf);
702        let result = WalRecord::read(&mut cursor);
703        assert!(result.is_err());
704    }
705
706    #[test]
707    fn test_read_checksum_mismatch() {
708        let record = WalRecord::Begin { tx_id: 42 };
709        let mut encoded = record.encode();
710
711        // Corrupt the last byte (checksum)
712        let len = encoded.len();
713        encoded[len - 1] ^= 0xFF;
714
715        let mut cursor = Cursor::new(encoded);
716        let result = WalRecord::read(&mut cursor);
717        assert!(result.is_err());
718    }
719
720    #[test]
721    fn test_read_data_corruption() {
722        let record = WalRecord::PageWrite {
723            tx_id: 1,
724            page_id: 2,
725            data: vec![1, 2, 3, 4],
726        };
727        let mut encoded = record.encode();
728
729        // Corrupt a data byte
730        encoded[15] ^= 0xFF;
731
732        let mut cursor = Cursor::new(encoded);
733        let result = WalRecord::read(&mut cursor);
734        assert!(result.is_err()); // Checksum will fail
735    }
736
737    // ==================== Multiple Records Tests ====================
738
739    #[test]
740    fn test_multiple_records_sequential() {
741        let records = vec![
742            WalRecord::Begin { tx_id: 1 },
743            WalRecord::PageWrite {
744                tx_id: 1,
745                page_id: 10,
746                data: vec![1, 2, 3],
747            },
748            WalRecord::PageWrite {
749                tx_id: 1,
750                page_id: 20,
751                data: vec![4, 5, 6],
752            },
753            WalRecord::Commit { tx_id: 1 },
754            WalRecord::Checkpoint { lsn: 100 },
755        ];
756
757        // Encode all
758        let mut buf = Vec::new();
759        for r in &records {
760            buf.extend_from_slice(&r.encode());
761        }
762
763        // Read them back
764        let mut cursor = Cursor::new(buf);
765        for expected in &records {
766            let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
767            assert_eq!(&decoded, expected);
768        }
769
770        // Next read should return None (EOF)
771        assert!(WalRecord::read(&mut cursor).unwrap().is_none());
772    }
773
774    // ==================== Constants Tests ====================
775
776    #[test]
777    fn test_wal_magic() {
778        assert_eq!(WAL_MAGIC, b"RDBW");
779    }
780
781    #[test]
782    fn test_wal_version() {
783        assert_eq!(WAL_VERSION, 2);
784    }
785}