Skip to main content

reddb_server/storage/wal/
record.rs

1use crate::storage::engine::crc32::{crc32, crc32_update};
2use std::io::{self, Read};
3
4/// WAL file magic bytes (RDBW)
5pub const WAL_MAGIC: &[u8; 4] = b"RDBW";
6
7/// WAL file format version
8pub const WAL_VERSION: u8 = 2;
9
10/// Minimum payload size (bytes) to attempt zstd compression.
11/// Smaller records pay more overhead than benefit from compression.
12const COMPRESS_THRESHOLD: usize = 256;
13
14/// Compression algorithm tag embedded in `PageWriteCompressed` records.
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16#[repr(u8)]
17pub enum Compression {
18    None = 0,
19    Zstd = 1,
20}
21
22impl Compression {
23    fn from_u8(v: u8) -> Option<Self> {
24        match v {
25            0 => Some(Compression::None),
26            1 => Some(Compression::Zstd),
27            _ => None,
28        }
29    }
30}
31
32/// Type of WAL record
33#[derive(Debug, Clone, Copy, PartialEq, Eq)]
34#[repr(u8)]
35pub enum RecordType {
36    Begin = 1,
37    Commit = 2,
38    Rollback = 3,
39    /// Legacy uncompressed page write (v1 format — still written for
40    /// small payloads to avoid compression overhead).
41    PageWrite = 4,
42    Checkpoint = 5,
43    /// Compressed page write (v2 format).
44    ///
45    /// Layout (after the type byte):
46    /// ```text
47    /// [TxID: 8][PageID: 4][Compression: 1][OrigLen: 4][DataLen: 4][Data: N][CRC: 4]
48    /// ```
49    /// `OrigLen` is the original (uncompressed) size; needed to pre-allocate
50    /// the decompression buffer.
51    PageWriteCompressed = 6,
52    /// Logical autocommit transaction commit batch (v2 format).
53    ///
54    /// Layout (after the type byte):
55    /// ```text
56    /// [TxID: 8][ActionCount: 4][[DataLen: 4][Data: N]...][CRC: 4]
57    /// ```
58    TxCommitBatch = 7,
59    /// Full-page image (FPI). Captures a complete page before its first
60    /// modification within a checkpoint cycle so torn-page recovery can
61    /// replay the pristine image before redo replays subsequent
62    /// `PageWrite`s. Enables `fold_dwb_into_wal` to retire the `-dwb`
63    /// sidecar (gh-478).
64    ///
65    /// Layout (after the type byte):
66    /// ```text
67    /// [TxID: 8][PageID: 4][CkptEpoch: 8][DataLen: 4][Data: N][CRC: 4]
68    /// ```
69    FullPageImage = 8,
70    /// Logical vector insert for vector-turbo WAL replay.
71    VectorInsert = 9,
72}
73
74impl RecordType {
75    pub fn from_u8(v: u8) -> Option<Self> {
76        match v {
77            1 => Some(RecordType::Begin),
78            2 => Some(RecordType::Commit),
79            3 => Some(RecordType::Rollback),
80            4 => Some(RecordType::PageWrite),
81            5 => Some(RecordType::Checkpoint),
82            6 => Some(RecordType::PageWriteCompressed),
83            7 => Some(RecordType::TxCommitBatch),
84            8 => Some(RecordType::FullPageImage),
85            9 => Some(RecordType::VectorInsert),
86            _ => None,
87        }
88    }
89}
90
91/// A single entry in the write-ahead log
92#[derive(Debug, Clone, PartialEq)]
93pub enum WalRecord {
94    /// Start of a transaction
95    Begin { tx_id: u64 },
96    /// Commit of a transaction
97    Commit { tx_id: u64 },
98    /// Rollback of a transaction
99    Rollback { tx_id: u64 },
100    /// Write of a page — always carries uncompressed data (transparent to
101    /// callers: `read()` decompresses on-the-fly).
102    PageWrite {
103        tx_id: u64,
104        page_id: u32,
105        data: Vec<u8>,
106    },
107    /// Atomic logical commit batch. Recovery applies all actions in
108    /// order iff this complete record and checksum are present.
109    TxCommitBatch { tx_id: u64, actions: Vec<Vec<u8>> },
110    /// Full-page image — pristine page bytes captured before the first
111    /// modification per checkpoint cycle. Recovery applies these before
112    /// redo so torn writes are healed without the `-dwb` sidecar.
113    FullPageImage {
114        tx_id: u64,
115        page_id: u32,
116        ckpt_epoch: u64,
117        data: Vec<u8>,
118    },
119    /// Logical vector insert payload. Recovery can replay FP32 into the
120    /// in-memory vector-turbo index without requiring snapshot files.
121    VectorInsert {
122        collection: String,
123        entity_id: u64,
124        vector: Vec<f32>,
125    },
126    /// Checkpoint marker (indicates up to which LSN pages are flushed)
127    Checkpoint { lsn: u64 },
128}
129
130impl WalRecord {
131    /// Serialize record to bytes (including checksum).
132    ///
133    /// `PageWrite` records whose payload is ≥ `COMPRESS_THRESHOLD` bytes are
134    /// compressed with zstd level 3 and emitted as `PageWriteCompressed`.
135    /// Smaller payloads use the plain `PageWrite` encoding (no overhead).
136    pub fn encode(&self) -> Vec<u8> {
137        let mut buf = Vec::new();
138
139        // Layout (non-PageWrite):
140        // [Type: 1]
141        // [TxID/LSN: 8]
142        // [Checksum: 4]
143        //
144        // PageWrite (uncompressed):
145        // [Type: 1][TxID: 8][PageID: 4][DataLen: 4][Data: N][CRC: 4]
146        //
147        // PageWriteCompressed:
148        // [Type: 1][TxID: 8][PageID: 4][Compression: 1][OrigLen: 4][DataLen: 4][Data: N][CRC: 4]
149        //
150        // TxCommitBatch:
151        // [Type: 1][TxID: 8][ActionCount: 4][[DataLen: 4][Data: N]...][CRC: 4]
152
153        match self {
154            WalRecord::Begin { tx_id } => {
155                buf.push(RecordType::Begin as u8);
156                buf.extend_from_slice(&tx_id.to_le_bytes());
157            }
158            WalRecord::Commit { tx_id } => {
159                buf.push(RecordType::Commit as u8);
160                buf.extend_from_slice(&tx_id.to_le_bytes());
161            }
162            WalRecord::Rollback { tx_id } => {
163                buf.push(RecordType::Rollback as u8);
164                buf.extend_from_slice(&tx_id.to_le_bytes());
165            }
166            WalRecord::PageWrite {
167                tx_id,
168                page_id,
169                data,
170            } => {
171                if data.len() >= COMPRESS_THRESHOLD {
172                    // Try zstd compression; fall back to uncompressed if it expands.
173                    if let Ok(compressed) =
174                        zstd::bulk::compress(data.as_slice(), /* level */ 3)
175                    {
176                        if compressed.len() < data.len() {
177                            // Compressed is smaller — use compressed format.
178                            buf.push(RecordType::PageWriteCompressed as u8);
179                            buf.extend_from_slice(&tx_id.to_le_bytes());
180                            buf.extend_from_slice(&page_id.to_le_bytes());
181                            buf.push(Compression::Zstd as u8);
182                            buf.extend_from_slice(&(data.len() as u32).to_le_bytes()); // orig_len
183                            buf.extend_from_slice(&(compressed.len() as u32).to_le_bytes());
184                            buf.extend_from_slice(&compressed);
185                            let checksum = crc32(&buf);
186                            buf.extend_from_slice(&checksum.to_le_bytes());
187                            return buf;
188                        }
189                    }
190                }
191                // Uncompressed path (small payload or compression expanded).
192                buf.push(RecordType::PageWrite as u8);
193                buf.extend_from_slice(&tx_id.to_le_bytes());
194                buf.extend_from_slice(&page_id.to_le_bytes());
195                buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
196                buf.extend_from_slice(data);
197            }
198            WalRecord::TxCommitBatch { tx_id, actions } => {
199                buf.push(RecordType::TxCommitBatch as u8);
200                buf.extend_from_slice(&tx_id.to_le_bytes());
201                buf.extend_from_slice(&(actions.len() as u32).to_le_bytes());
202                for action in actions {
203                    buf.extend_from_slice(&(action.len() as u32).to_le_bytes());
204                    buf.extend_from_slice(action);
205                }
206            }
207            WalRecord::FullPageImage {
208                tx_id,
209                page_id,
210                ckpt_epoch,
211                data,
212            } => {
213                buf.push(RecordType::FullPageImage as u8);
214                buf.extend_from_slice(&tx_id.to_le_bytes());
215                buf.extend_from_slice(&page_id.to_le_bytes());
216                buf.extend_from_slice(&ckpt_epoch.to_le_bytes());
217                buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
218                buf.extend_from_slice(data);
219            }
220            WalRecord::VectorInsert {
221                collection,
222                entity_id,
223                vector,
224            } => {
225                buf.push(RecordType::VectorInsert as u8);
226                buf.extend_from_slice(&(collection.len() as u32).to_le_bytes());
227                buf.extend_from_slice(collection.as_bytes());
228                buf.extend_from_slice(&entity_id.to_le_bytes());
229                buf.extend_from_slice(&(vector.len() as u32).to_le_bytes());
230                for value in vector {
231                    buf.extend_from_slice(&value.to_le_bytes());
232                }
233            }
234            WalRecord::Checkpoint { lsn } => {
235                buf.push(RecordType::Checkpoint as u8);
236                buf.extend_from_slice(&lsn.to_le_bytes());
237            }
238        }
239
240        // Calculate and append checksum
241        let checksum = crc32(&buf);
242        buf.extend_from_slice(&checksum.to_le_bytes());
243
244        buf
245    }
246
247    /// Read a record from a reader.
248    ///
249    /// Handles both v1 (`PageWrite`) and v2 (`PageWriteCompressed`) record
250    /// formats transparently — callers always receive uncompressed data.
251    pub fn read<R: Read>(reader: &mut R) -> io::Result<Option<WalRecord>> {
252        // Read type byte
253        let mut type_buf = [0u8; 1];
254        match reader.read_exact(&mut type_buf) {
255            Ok(_) => (),
256            Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
257            Err(e) => return Err(e),
258        };
259
260        let record_type = RecordType::from_u8(type_buf[0])
261            .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "Invalid record type"))?;
262
263        // Start checksum calculation
264        let mut running_crc = crc32_update(0, &type_buf);
265
266        let record = match record_type {
267            RecordType::Begin | RecordType::Commit | RecordType::Rollback => {
268                let mut buf = [0u8; 8];
269                reader.read_exact(&mut buf)?;
270                running_crc = crc32_update(running_crc, &buf);
271                let tx_id = u64::from_le_bytes(buf);
272
273                match record_type {
274                    RecordType::Begin => WalRecord::Begin { tx_id },
275                    RecordType::Commit => WalRecord::Commit { tx_id },
276                    RecordType::Rollback => WalRecord::Rollback { tx_id },
277                    _ => unreachable!(),
278                }
279            }
280            RecordType::PageWrite => {
281                // Read TxID
282                let mut tx_buf = [0u8; 8];
283                reader.read_exact(&mut tx_buf)?;
284                running_crc = crc32_update(running_crc, &tx_buf);
285                let tx_id = u64::from_le_bytes(tx_buf);
286
287                // Read PageID
288                let mut page_buf = [0u8; 4];
289                reader.read_exact(&mut page_buf)?;
290                running_crc = crc32_update(running_crc, &page_buf);
291                let page_id = u32::from_le_bytes(page_buf);
292
293                // Read Length
294                let mut len_buf = [0u8; 4];
295                reader.read_exact(&mut len_buf)?;
296                running_crc = crc32_update(running_crc, &len_buf);
297                let len = u32::from_le_bytes(len_buf) as usize;
298
299                // Read Data
300                let mut data = vec![0u8; len];
301                reader.read_exact(&mut data)?;
302                running_crc = crc32_update(running_crc, &data);
303
304                WalRecord::PageWrite {
305                    tx_id,
306                    page_id,
307                    data,
308                }
309            }
310            RecordType::PageWriteCompressed => {
311                // Read TxID
312                let mut tx_buf = [0u8; 8];
313                reader.read_exact(&mut tx_buf)?;
314                running_crc = crc32_update(running_crc, &tx_buf);
315                let tx_id = u64::from_le_bytes(tx_buf);
316
317                // Read PageID
318                let mut page_buf = [0u8; 4];
319                reader.read_exact(&mut page_buf)?;
320                running_crc = crc32_update(running_crc, &page_buf);
321                let page_id = u32::from_le_bytes(page_buf);
322
323                // Read Compression algorithm byte
324                let mut comp_buf = [0u8; 1];
325                reader.read_exact(&mut comp_buf)?;
326                running_crc = crc32_update(running_crc, &comp_buf);
327                let compression = Compression::from_u8(comp_buf[0]).ok_or_else(|| {
328                    io::Error::new(
329                        io::ErrorKind::InvalidData,
330                        format!("Unknown WAL compression algorithm: {}", comp_buf[0]),
331                    )
332                })?;
333
334                // Read original (uncompressed) length — used to pre-allocate decompression buffer
335                let mut orig_len_buf = [0u8; 4];
336                reader.read_exact(&mut orig_len_buf)?;
337                running_crc = crc32_update(running_crc, &orig_len_buf);
338                let orig_len = u32::from_le_bytes(orig_len_buf) as usize;
339
340                // Read compressed data length
341                let mut len_buf = [0u8; 4];
342                reader.read_exact(&mut len_buf)?;
343                running_crc = crc32_update(running_crc, &len_buf);
344                let len = u32::from_le_bytes(len_buf) as usize;
345
346                // Read compressed data
347                let mut compressed = vec![0u8; len];
348                reader.read_exact(&mut compressed)?;
349                running_crc = crc32_update(running_crc, &compressed);
350
351                // Decompress
352                let data = match compression {
353                    Compression::Zstd => {
354                        let mut out = vec![0u8; orig_len];
355                        zstd::bulk::decompress_to_buffer(&compressed, &mut out).map_err(|e| {
356                            io::Error::new(
357                                io::ErrorKind::InvalidData,
358                                format!("WAL zstd decompress failed: {e}"),
359                            )
360                        })?;
361                        out
362                    }
363                    Compression::None => compressed,
364                };
365
366                WalRecord::PageWrite {
367                    tx_id,
368                    page_id,
369                    data,
370                }
371            }
372            RecordType::TxCommitBatch => {
373                let mut tx_buf = [0u8; 8];
374                reader.read_exact(&mut tx_buf)?;
375                running_crc = crc32_update(running_crc, &tx_buf);
376                let tx_id = u64::from_le_bytes(tx_buf);
377
378                let mut count_buf = [0u8; 4];
379                reader.read_exact(&mut count_buf)?;
380                running_crc = crc32_update(running_crc, &count_buf);
381                let count = u32::from_le_bytes(count_buf) as usize;
382
383                let mut actions = Vec::with_capacity(count);
384                for _ in 0..count {
385                    let mut len_buf = [0u8; 4];
386                    reader.read_exact(&mut len_buf)?;
387                    running_crc = crc32_update(running_crc, &len_buf);
388                    let len = u32::from_le_bytes(len_buf) as usize;
389
390                    let mut action = vec![0u8; len];
391                    reader.read_exact(&mut action)?;
392                    running_crc = crc32_update(running_crc, &action);
393                    actions.push(action);
394                }
395
396                WalRecord::TxCommitBatch { tx_id, actions }
397            }
398            RecordType::VectorInsert => {
399                let mut len_buf = [0u8; 4];
400                reader.read_exact(&mut len_buf)?;
401                running_crc = crc32_update(running_crc, &len_buf);
402                let collection_len = u32::from_le_bytes(len_buf) as usize;
403
404                let mut collection_buf = vec![0u8; collection_len];
405                reader.read_exact(&mut collection_buf)?;
406                running_crc = crc32_update(running_crc, &collection_buf);
407                let collection = String::from_utf8(collection_buf).map_err(|err| {
408                    io::Error::new(
409                        io::ErrorKind::InvalidData,
410                        format!("invalid collection utf8: {err}"),
411                    )
412                })?;
413
414                let mut entity_buf = [0u8; 8];
415                reader.read_exact(&mut entity_buf)?;
416                running_crc = crc32_update(running_crc, &entity_buf);
417                let entity_id = u64::from_le_bytes(entity_buf);
418
419                let mut count_buf = [0u8; 4];
420                reader.read_exact(&mut count_buf)?;
421                running_crc = crc32_update(running_crc, &count_buf);
422                let count = u32::from_le_bytes(count_buf) as usize;
423
424                let mut vector = Vec::with_capacity(count);
425                for _ in 0..count {
426                    let mut value_buf = [0u8; 4];
427                    reader.read_exact(&mut value_buf)?;
428                    running_crc = crc32_update(running_crc, &value_buf);
429                    vector.push(f32::from_le_bytes(value_buf));
430                }
431
432                WalRecord::VectorInsert {
433                    collection,
434                    entity_id,
435                    vector,
436                }
437            }
438            RecordType::FullPageImage => {
439                let mut tx_buf = [0u8; 8];
440                reader.read_exact(&mut tx_buf)?;
441                running_crc = crc32_update(running_crc, &tx_buf);
442                let tx_id = u64::from_le_bytes(tx_buf);
443
444                let mut page_buf = [0u8; 4];
445                reader.read_exact(&mut page_buf)?;
446                running_crc = crc32_update(running_crc, &page_buf);
447                let page_id = u32::from_le_bytes(page_buf);
448
449                let mut epoch_buf = [0u8; 8];
450                reader.read_exact(&mut epoch_buf)?;
451                running_crc = crc32_update(running_crc, &epoch_buf);
452                let ckpt_epoch = u64::from_le_bytes(epoch_buf);
453
454                let mut len_buf = [0u8; 4];
455                reader.read_exact(&mut len_buf)?;
456                running_crc = crc32_update(running_crc, &len_buf);
457                let len = u32::from_le_bytes(len_buf) as usize;
458
459                let mut data = vec![0u8; len];
460                reader.read_exact(&mut data)?;
461                running_crc = crc32_update(running_crc, &data);
462
463                WalRecord::FullPageImage {
464                    tx_id,
465                    page_id,
466                    ckpt_epoch,
467                    data,
468                }
469            }
470            RecordType::Checkpoint => {
471                let mut buf = [0u8; 8];
472                reader.read_exact(&mut buf)?;
473                running_crc = crc32_update(running_crc, &buf);
474                let lsn = u64::from_le_bytes(buf);
475                WalRecord::Checkpoint { lsn }
476            }
477        };
478
479        // Verify checksum
480        let mut crc_buf = [0u8; 4];
481        reader.read_exact(&mut crc_buf)?;
482        let stored_crc = u32::from_le_bytes(crc_buf);
483
484        if running_crc != stored_crc {
485            return Err(io::Error::new(
486                io::ErrorKind::InvalidData,
487                "WAL record checksum mismatch",
488            ));
489        }
490
491        Ok(Some(record))
492    }
493}
494
495#[cfg(test)]
496mod tests {
497    use super::*;
498    use std::io::Cursor;
499
500    // ==================== RecordType Tests ====================
501
502    #[test]
503    fn test_record_type_from_u8() {
504        assert_eq!(RecordType::from_u8(1), Some(RecordType::Begin));
505        assert_eq!(RecordType::from_u8(2), Some(RecordType::Commit));
506        assert_eq!(RecordType::from_u8(3), Some(RecordType::Rollback));
507        assert_eq!(RecordType::from_u8(4), Some(RecordType::PageWrite));
508        assert_eq!(RecordType::from_u8(5), Some(RecordType::Checkpoint));
509        assert_eq!(
510            RecordType::from_u8(6),
511            Some(RecordType::PageWriteCompressed)
512        );
513        assert_eq!(RecordType::from_u8(7), Some(RecordType::TxCommitBatch));
514        assert_eq!(RecordType::from_u8(8), Some(RecordType::FullPageImage));
515    }
516
517    #[test]
518    fn test_record_type_invalid() {
519        assert_eq!(RecordType::from_u8(0), None);
520        assert_eq!(RecordType::from_u8(9), None);
521        assert_eq!(RecordType::from_u8(255), None);
522    }
523
524    // ==================== WalRecord::encode Tests ====================
525
526    #[test]
527    fn test_encode_begin() {
528        let record = WalRecord::Begin { tx_id: 12345 };
529        let encoded = record.encode();
530
531        // Type (1) + TxID (8) + Checksum (4) = 13 bytes
532        assert_eq!(encoded.len(), 13);
533        assert_eq!(encoded[0], RecordType::Begin as u8);
534    }
535
536    #[test]
537    fn test_encode_commit() {
538        let record = WalRecord::Commit { tx_id: 99999 };
539        let encoded = record.encode();
540
541        assert_eq!(encoded.len(), 13);
542        assert_eq!(encoded[0], RecordType::Commit as u8);
543    }
544
545    #[test]
546    fn test_encode_rollback() {
547        let record = WalRecord::Rollback { tx_id: 54321 };
548        let encoded = record.encode();
549
550        assert_eq!(encoded.len(), 13);
551        assert_eq!(encoded[0], RecordType::Rollback as u8);
552    }
553
554    #[test]
555    fn test_encode_checkpoint() {
556        let record = WalRecord::Checkpoint { lsn: 1000000 };
557        let encoded = record.encode();
558
559        assert_eq!(encoded.len(), 13);
560        assert_eq!(encoded[0], RecordType::Checkpoint as u8);
561    }
562
563    #[test]
564    fn test_encode_page_write_small() {
565        // Small data (< COMPRESS_THRESHOLD) stays uncompressed.
566        let data = vec![1, 2, 3, 4, 5];
567        let record = WalRecord::PageWrite {
568            tx_id: 100,
569            page_id: 42,
570            data: data.clone(),
571        };
572        let encoded = record.encode();
573
574        // Type (1) + TxID (8) + PageID (4) + Len (4) + Data (5) + Checksum (4) = 26 bytes
575        assert_eq!(encoded.len(), 26);
576        assert_eq!(encoded[0], RecordType::PageWrite as u8);
577    }
578
579    #[test]
580    fn test_encode_page_write_empty_data() {
581        let record = WalRecord::PageWrite {
582            tx_id: 1,
583            page_id: 0,
584            data: vec![],
585        };
586        let encoded = record.encode();
587
588        // Type (1) + TxID (8) + PageID (4) + Len (4) + Checksum (4) = 21 bytes
589        assert_eq!(encoded.len(), 21);
590    }
591
592    #[test]
593    fn test_encode_tx_commit_batch() {
594        let record = WalRecord::TxCommitBatch {
595            tx_id: 7,
596            actions: vec![b"insert".to_vec(), b"update".to_vec()],
597        };
598        let encoded = record.encode();
599
600        assert_eq!(encoded[0], RecordType::TxCommitBatch as u8);
601    }
602
603    // ==================== WalRecord::read Tests ====================
604
605    #[test]
606    fn test_read_begin_roundtrip() {
607        let original = WalRecord::Begin { tx_id: 42 };
608        let encoded = original.encode();
609
610        let mut cursor = Cursor::new(encoded);
611        let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
612
613        assert_eq!(decoded, original);
614    }
615
616    #[test]
617    fn test_read_commit_roundtrip() {
618        let original = WalRecord::Commit { tx_id: 999 };
619        let encoded = original.encode();
620
621        let mut cursor = Cursor::new(encoded);
622        let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
623
624        assert_eq!(decoded, original);
625    }
626
627    #[test]
628    fn test_read_rollback_roundtrip() {
629        let original = WalRecord::Rollback { tx_id: 777 };
630        let encoded = original.encode();
631
632        let mut cursor = Cursor::new(encoded);
633        let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
634
635        assert_eq!(decoded, original);
636    }
637
638    #[test]
639    fn test_read_checkpoint_roundtrip() {
640        let original = WalRecord::Checkpoint { lsn: 123456789 };
641        let encoded = original.encode();
642
643        let mut cursor = Cursor::new(encoded);
644        let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
645
646        assert_eq!(decoded, original);
647    }
648
649    #[test]
650    fn test_read_page_write_roundtrip() {
651        let original = WalRecord::PageWrite {
652            tx_id: 50,
653            page_id: 100,
654            data: vec![10, 20, 30, 40, 50, 60, 70, 80],
655        };
656        let encoded = original.encode();
657
658        let mut cursor = Cursor::new(encoded);
659        let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
660
661        assert_eq!(decoded, original);
662    }
663
664    #[test]
665    fn test_read_tx_commit_batch_roundtrip() {
666        let original = WalRecord::TxCommitBatch {
667            tx_id: 42,
668            actions: vec![b"old-version".to_vec(), b"new-version".to_vec()],
669        };
670        let encoded = original.encode();
671
672        let mut cursor = Cursor::new(encoded);
673        let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
674
675        assert_eq!(decoded, original);
676    }
677
678    #[test]
679    fn test_vector_insert_roundtrip() {
680        let original = WalRecord::VectorInsert {
681            collection: "turbo".to_string(),
682            entity_id: 42,
683            vector: vec![1.0, -0.5, 0.25],
684        };
685        let encoded = original.encode();
686
687        let mut cursor = Cursor::new(encoded);
688        let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
689
690        assert_eq!(decoded, original);
691    }
692
693    #[test]
694    fn test_read_page_write_large_data() {
695        // Large enough to trigger compression.
696        let data: Vec<u8> = (0..4096).map(|i| (i % 256) as u8).collect();
697        let original = WalRecord::PageWrite {
698            tx_id: 1,
699            page_id: 0,
700            data,
701        };
702        let encoded = original.encode();
703
704        let mut cursor = Cursor::new(encoded);
705        let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
706
707        // Round-trip: decoded data matches original (even if encoded differently).
708        assert_eq!(decoded, original);
709    }
710
711    #[test]
712    fn page_write_compressed_roundtrip() {
713        // Highly compressible payload: 1 KiB of repeated bytes.
714        let data = vec![0xABu8; 1024];
715        let record = WalRecord::PageWrite {
716            tx_id: 7,
717            page_id: 3,
718            data: data.clone(),
719        };
720        let encoded = record.encode();
721
722        // Should be stored as PageWriteCompressed (compressible > threshold).
723        assert_eq!(encoded[0], RecordType::PageWriteCompressed as u8);
724
725        // And round-trip decoding recovers original.
726        let mut cursor = Cursor::new(encoded);
727        let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
728        assert_eq!(
729            decoded,
730            WalRecord::PageWrite {
731                tx_id: 7,
732                page_id: 3,
733                data
734            }
735        );
736    }
737
738    #[test]
739    fn full_page_image_roundtrip() {
740        let data: Vec<u8> = (0..4096).map(|i| (i % 251) as u8).collect();
741        let original = WalRecord::FullPageImage {
742            tx_id: 11,
743            page_id: 9,
744            ckpt_epoch: 42,
745            data: data.clone(),
746        };
747        let encoded = original.encode();
748        assert_eq!(encoded[0], RecordType::FullPageImage as u8);
749
750        let mut cursor = Cursor::new(encoded);
751        let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
752        assert_eq!(decoded, original);
753    }
754
755    #[test]
756    fn full_page_image_checksum_mismatch_detected() {
757        let original = WalRecord::FullPageImage {
758            tx_id: 1,
759            page_id: 2,
760            ckpt_epoch: 3,
761            data: vec![0xAA; 32],
762        };
763        let mut encoded = original.encode();
764        let mid = encoded.len() / 2;
765        encoded[mid] ^= 0xFF;
766        let mut cursor = Cursor::new(encoded);
767        assert!(WalRecord::read(&mut cursor).is_err());
768    }
769
770    #[test]
771    fn test_read_eof() {
772        let mut cursor = Cursor::new(Vec::<u8>::new());
773        let result = WalRecord::read(&mut cursor).unwrap();
774        assert!(result.is_none());
775    }
776
777    #[test]
778    fn test_read_invalid_record_type() {
779        let buf = vec![99, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]; // Invalid type 99
780        let mut cursor = Cursor::new(buf);
781        let result = WalRecord::read(&mut cursor);
782        assert!(result.is_err());
783    }
784
785    #[test]
786    fn test_read_checksum_mismatch() {
787        let record = WalRecord::Begin { tx_id: 42 };
788        let mut encoded = record.encode();
789
790        // Corrupt the last byte (checksum)
791        let len = encoded.len();
792        encoded[len - 1] ^= 0xFF;
793
794        let mut cursor = Cursor::new(encoded);
795        let result = WalRecord::read(&mut cursor);
796        assert!(result.is_err());
797    }
798
799    #[test]
800    fn test_read_data_corruption() {
801        let record = WalRecord::PageWrite {
802            tx_id: 1,
803            page_id: 2,
804            data: vec![1, 2, 3, 4],
805        };
806        let mut encoded = record.encode();
807
808        // Corrupt a data byte
809        encoded[15] ^= 0xFF;
810
811        let mut cursor = Cursor::new(encoded);
812        let result = WalRecord::read(&mut cursor);
813        assert!(result.is_err()); // Checksum will fail
814    }
815
816    // ==================== Multiple Records Tests ====================
817
818    #[test]
819    fn test_multiple_records_sequential() {
820        let records = vec![
821            WalRecord::Begin { tx_id: 1 },
822            WalRecord::PageWrite {
823                tx_id: 1,
824                page_id: 10,
825                data: vec![1, 2, 3],
826            },
827            WalRecord::PageWrite {
828                tx_id: 1,
829                page_id: 20,
830                data: vec![4, 5, 6],
831            },
832            WalRecord::Commit { tx_id: 1 },
833            WalRecord::Checkpoint { lsn: 100 },
834        ];
835
836        // Encode all
837        let mut buf = Vec::new();
838        for r in &records {
839            buf.extend_from_slice(&r.encode());
840        }
841
842        // Read them back
843        let mut cursor = Cursor::new(buf);
844        for expected in &records {
845            let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
846            assert_eq!(&decoded, expected);
847        }
848
849        // Next read should return None (EOF)
850        assert!(WalRecord::read(&mut cursor).unwrap().is_none());
851    }
852
853    // ==================== Constants Tests ====================
854
855    #[test]
856    fn test_wal_magic() {
857        assert_eq!(WAL_MAGIC, b"RDBW");
858    }
859
860    #[test]
861    fn test_wal_version() {
862        assert_eq!(WAL_VERSION, 2);
863    }
864}