Skip to main content

reddb_server/storage/wal/
record.rs

1use crate::storage::engine::crc32::{crc32, crc32_update};
2use std::io::{self, Read};
3
4/// WAL file magic bytes (RDBW)
5pub const WAL_MAGIC: &[u8; 4] = b"RDBW";
6
7/// WAL file format version
8pub const WAL_VERSION: u8 = 3;
9pub const WAL_VERSION_V2: u8 = 2;
10pub const WAL_DEFAULT_TERM: u64 = crate::replication::DEFAULT_REPLICATION_TERM;
11
12/// Minimum payload size (bytes) to attempt zstd compression.
13/// Smaller records pay more overhead than benefit from compression.
14const COMPRESS_THRESHOLD: usize = 256;
15
16/// Compression algorithm tag embedded in `PageWriteCompressed` records.
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18#[repr(u8)]
19pub enum Compression {
20    None = 0,
21    Zstd = 1,
22}
23
24impl Compression {
25    fn from_u8(v: u8) -> Option<Self> {
26        match v {
27            0 => Some(Compression::None),
28            1 => Some(Compression::Zstd),
29            _ => None,
30        }
31    }
32}
33
34/// Type of WAL record
35#[derive(Debug, Clone, Copy, PartialEq, Eq)]
36#[repr(u8)]
37pub enum RecordType {
38    Begin = 1,
39    Commit = 2,
40    Rollback = 3,
41    /// Legacy uncompressed page write (v1 format — still written for
42    /// small payloads to avoid compression overhead).
43    PageWrite = 4,
44    Checkpoint = 5,
45    /// Compressed page write (v2 format).
46    ///
47    /// Layout (after the type byte):
48    /// ```text
49    /// [TxID: 8][PageID: 4][Compression: 1][OrigLen: 4][DataLen: 4][Data: N][CRC: 4]
50    /// ```
51    /// `OrigLen` is the original (uncompressed) size; needed to pre-allocate
52    /// the decompression buffer.
53    PageWriteCompressed = 6,
54    /// Logical autocommit transaction commit batch (v2 format).
55    ///
56    /// Layout (after the type byte):
57    /// ```text
58    /// [TxID: 8][ActionCount: 4][[DataLen: 4][Data: N]...][CRC: 4]
59    /// ```
60    TxCommitBatch = 7,
61    /// Full-page image (FPI). Captures a complete page before its first
62    /// modification within a checkpoint cycle so torn-page recovery can
63    /// replay the pristine image before redo replays subsequent
64    /// `PageWrite`s. Enables `fold_dwb_into_wal` to retire the `-dwb`
65    /// sidecar (gh-478).
66    ///
67    /// Layout (after the type byte):
68    /// ```text
69    /// [TxID: 8][PageID: 4][CkptEpoch: 8][DataLen: 4][Data: N][CRC: 4]
70    /// ```
71    FullPageImage = 8,
72    /// Logical vector insert for vector-turbo WAL replay.
73    VectorInsert = 9,
74}
75
76impl RecordType {
77    pub fn from_u8(v: u8) -> Option<Self> {
78        match v {
79            1 => Some(RecordType::Begin),
80            2 => Some(RecordType::Commit),
81            3 => Some(RecordType::Rollback),
82            4 => Some(RecordType::PageWrite),
83            5 => Some(RecordType::Checkpoint),
84            6 => Some(RecordType::PageWriteCompressed),
85            7 => Some(RecordType::TxCommitBatch),
86            8 => Some(RecordType::FullPageImage),
87            9 => Some(RecordType::VectorInsert),
88            _ => None,
89        }
90    }
91}
92
93/// A single entry in the write-ahead log
94#[derive(Debug, Clone, PartialEq)]
95pub enum WalRecord {
96    /// Start of a transaction
97    Begin { tx_id: u64 },
98    /// Commit of a transaction
99    Commit { tx_id: u64 },
100    /// Rollback of a transaction
101    Rollback { tx_id: u64 },
102    /// Write of a page — always carries uncompressed data (transparent to
103    /// callers: `read()` decompresses on-the-fly).
104    PageWrite {
105        tx_id: u64,
106        page_id: u32,
107        data: Vec<u8>,
108    },
109    /// Atomic logical commit batch. Recovery applies all actions in
110    /// order iff this complete record and checksum are present.
111    TxCommitBatch { tx_id: u64, actions: Vec<Vec<u8>> },
112    /// Full-page image — pristine page bytes captured before the first
113    /// modification per checkpoint cycle. Recovery applies these before
114    /// redo so torn writes are healed without the `-dwb` sidecar.
115    FullPageImage {
116        tx_id: u64,
117        page_id: u32,
118        ckpt_epoch: u64,
119        data: Vec<u8>,
120    },
121    /// Logical vector insert payload. Recovery can replay FP32 into the
122    /// in-memory vector-turbo index without requiring snapshot files.
123    VectorInsert {
124        collection: String,
125        entity_id: u64,
126        vector: Vec<f32>,
127    },
128    /// Checkpoint marker (indicates up to which LSN pages are flushed)
129    Checkpoint { lsn: u64 },
130}
131
132impl WalRecord {
133    /// Serialize record to bytes (including checksum).
134    ///
135    /// `PageWrite` records whose payload is ≥ `COMPRESS_THRESHOLD` bytes are
136    /// compressed with zstd level 3 and emitted as `PageWriteCompressed`.
137    /// Smaller payloads use the plain `PageWrite` encoding (no overhead).
138    pub fn encode(&self) -> Vec<u8> {
139        self.encode_with_term(WAL_DEFAULT_TERM)
140    }
141
142    /// Serialize record to bytes with the replication term stamped into
143    /// the physical record envelope.
144    pub fn encode_with_term(&self, term: u64) -> Vec<u8> {
145        let mut buf = Vec::new();
146        self.encode_with_term_into(&mut buf, term);
147        buf
148    }
149
150    /// Serialize record into a caller-owned scratch buffer, appending the
151    /// encoded bytes (including checksum) to `out`.
152    ///
153    /// This is the allocation-light entry point for the lock-free append path:
154    /// concurrent appenders each encode into their own per-call `out` buffer
155    /// *before* taking the WAL lock, so the scratch is never shared across
156    /// threads and needs no `thread_local!`. Reusing one `out` across many
157    /// records (the commit blob) avoids the fresh `Vec` + copy that
158    /// [`encode`](Self::encode) allocates per record. The bytes appended are
159    /// byte-identical to `encode()`.
160    pub fn encode_into(&self, out: &mut Vec<u8>) {
161        self.encode_with_term_into(out, WAL_DEFAULT_TERM)
162    }
163
164    /// Serialize record into a caller-owned scratch buffer with the replication
165    /// term stamped into the physical envelope. See [`encode_into`](Self::encode_into).
166    ///
167    /// The checksum is computed over only the bytes this call appends (the slice
168    /// starting at the buffer's prior length), so appending after existing
169    /// records leaves them untouched and keeps each record's CRC self-contained.
170    pub fn encode_with_term_into(&self, out: &mut Vec<u8>, term: u64) {
171        // Offset where this record begins; the checksum covers only `out[start..]`
172        // so a reused scratch buffer's earlier records are excluded.
173        let start = out.len();
174        let buf = out;
175
176        // Layout (non-PageWrite):
177        // [Type: 1]
178        // [Term: 8]
179        // [TxID/LSN: 8]
180        // [Checksum: 4]
181        //
182        // PageWrite (uncompressed):
183        // [Type: 1][TxID: 8][PageID: 4][DataLen: 4][Data: N][CRC: 4]
184        //
185        // PageWriteCompressed:
186        // [Type: 1][TxID: 8][PageID: 4][Compression: 1][OrigLen: 4][DataLen: 4][Data: N][CRC: 4]
187        //
188        // TxCommitBatch:
189        // [Type: 1][TxID: 8][ActionCount: 4][[DataLen: 4][Data: N]...][CRC: 4]
190
191        match self {
192            WalRecord::Begin { tx_id } => {
193                buf.push(RecordType::Begin as u8);
194                buf.extend_from_slice(&term.to_le_bytes());
195                buf.extend_from_slice(&tx_id.to_le_bytes());
196            }
197            WalRecord::Commit { tx_id } => {
198                buf.push(RecordType::Commit as u8);
199                buf.extend_from_slice(&term.to_le_bytes());
200                buf.extend_from_slice(&tx_id.to_le_bytes());
201            }
202            WalRecord::Rollback { tx_id } => {
203                buf.push(RecordType::Rollback as u8);
204                buf.extend_from_slice(&term.to_le_bytes());
205                buf.extend_from_slice(&tx_id.to_le_bytes());
206            }
207            WalRecord::PageWrite {
208                tx_id,
209                page_id,
210                data,
211            } => {
212                if data.len() >= COMPRESS_THRESHOLD {
213                    // Try zstd compression; fall back to uncompressed if it expands.
214                    if let Ok(compressed) =
215                        zstd::bulk::compress(data.as_slice(), /* level */ 3)
216                    {
217                        if compressed.len() < data.len() {
218                            // Compressed is smaller — use compressed format.
219                            buf.push(RecordType::PageWriteCompressed as u8);
220                            buf.extend_from_slice(&term.to_le_bytes());
221                            buf.extend_from_slice(&tx_id.to_le_bytes());
222                            buf.extend_from_slice(&page_id.to_le_bytes());
223                            buf.push(Compression::Zstd as u8);
224                            buf.extend_from_slice(&(data.len() as u32).to_le_bytes()); // orig_len
225                            buf.extend_from_slice(&(compressed.len() as u32).to_le_bytes());
226                            buf.extend_from_slice(&compressed);
227                            let checksum = crc32(&buf[start..]);
228                            buf.extend_from_slice(&checksum.to_le_bytes());
229                            return;
230                        }
231                    }
232                }
233                // Uncompressed path (small payload or compression expanded).
234                buf.push(RecordType::PageWrite as u8);
235                buf.extend_from_slice(&term.to_le_bytes());
236                buf.extend_from_slice(&tx_id.to_le_bytes());
237                buf.extend_from_slice(&page_id.to_le_bytes());
238                buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
239                buf.extend_from_slice(data);
240            }
241            WalRecord::TxCommitBatch { tx_id, actions } => {
242                buf.push(RecordType::TxCommitBatch as u8);
243                buf.extend_from_slice(&term.to_le_bytes());
244                buf.extend_from_slice(&tx_id.to_le_bytes());
245                buf.extend_from_slice(&(actions.len() as u32).to_le_bytes());
246                for action in actions {
247                    buf.extend_from_slice(&(action.len() as u32).to_le_bytes());
248                    buf.extend_from_slice(action);
249                }
250            }
251            WalRecord::FullPageImage {
252                tx_id,
253                page_id,
254                ckpt_epoch,
255                data,
256            } => {
257                buf.push(RecordType::FullPageImage as u8);
258                buf.extend_from_slice(&term.to_le_bytes());
259                buf.extend_from_slice(&tx_id.to_le_bytes());
260                buf.extend_from_slice(&page_id.to_le_bytes());
261                buf.extend_from_slice(&ckpt_epoch.to_le_bytes());
262                buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
263                buf.extend_from_slice(data);
264            }
265            WalRecord::VectorInsert {
266                collection,
267                entity_id,
268                vector,
269            } => {
270                buf.push(RecordType::VectorInsert as u8);
271                buf.extend_from_slice(&term.to_le_bytes());
272                buf.extend_from_slice(&(collection.len() as u32).to_le_bytes());
273                buf.extend_from_slice(collection.as_bytes());
274                buf.extend_from_slice(&entity_id.to_le_bytes());
275                buf.extend_from_slice(&(vector.len() as u32).to_le_bytes());
276                for value in vector {
277                    buf.extend_from_slice(&value.to_le_bytes());
278                }
279            }
280            WalRecord::Checkpoint { lsn } => {
281                buf.push(RecordType::Checkpoint as u8);
282                buf.extend_from_slice(&term.to_le_bytes());
283                buf.extend_from_slice(&lsn.to_le_bytes());
284            }
285        }
286
287        // Calculate and append checksum over only this record's bytes.
288        let checksum = crc32(&buf[start..]);
289        buf.extend_from_slice(&checksum.to_le_bytes());
290    }
291
292    /// Read a record from a reader.
293    ///
294    /// Handles both v1 (`PageWrite`) and v2 (`PageWriteCompressed`) record
295    /// formats transparently — callers always receive uncompressed data.
296    pub fn read<R: Read>(reader: &mut R) -> io::Result<Option<WalRecord>> {
297        Ok(Self::read_with_term(reader)?.map(|(_, record)| record))
298    }
299
300    /// Read a record and return the term stamped into its physical envelope.
301    pub fn read_with_term<R: Read>(reader: &mut R) -> io::Result<Option<(u64, WalRecord)>> {
302        Self::read_with_format_version(reader, WAL_VERSION)
303    }
304
305    pub(crate) fn read_with_format_version<R: Read>(
306        reader: &mut R,
307        format_version: u8,
308    ) -> io::Result<Option<(u64, WalRecord)>> {
309        // Read type byte
310        let mut type_buf = [0u8; 1];
311        match reader.read_exact(&mut type_buf) {
312            Ok(_) => (),
313            Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
314            Err(e) => return Err(e),
315        };
316
317        let record_type = RecordType::from_u8(type_buf[0])
318            .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "Invalid record type"))?;
319
320        // Start checksum calculation
321        let mut running_crc = crc32_update(0, &type_buf);
322        let term = match format_version {
323            WAL_VERSION => {
324                let mut term_buf = [0u8; 8];
325                reader.read_exact(&mut term_buf)?;
326                running_crc = crc32_update(running_crc, &term_buf);
327                u64::from_le_bytes(term_buf)
328            }
329            WAL_VERSION_V2 => WAL_DEFAULT_TERM,
330            _ => {
331                return Err(io::Error::new(
332                    io::ErrorKind::InvalidData,
333                    format!("Unsupported WAL version: {format_version}"),
334                ));
335            }
336        };
337
338        let record = match record_type {
339            RecordType::Begin | RecordType::Commit | RecordType::Rollback => {
340                let mut buf = [0u8; 8];
341                reader.read_exact(&mut buf)?;
342                running_crc = crc32_update(running_crc, &buf);
343                let tx_id = u64::from_le_bytes(buf);
344
345                match record_type {
346                    RecordType::Begin => WalRecord::Begin { tx_id },
347                    RecordType::Commit => WalRecord::Commit { tx_id },
348                    RecordType::Rollback => WalRecord::Rollback { tx_id },
349                    _ => unreachable!(),
350                }
351            }
352            RecordType::PageWrite => {
353                // Read TxID
354                let mut tx_buf = [0u8; 8];
355                reader.read_exact(&mut tx_buf)?;
356                running_crc = crc32_update(running_crc, &tx_buf);
357                let tx_id = u64::from_le_bytes(tx_buf);
358
359                // Read PageID
360                let mut page_buf = [0u8; 4];
361                reader.read_exact(&mut page_buf)?;
362                running_crc = crc32_update(running_crc, &page_buf);
363                let page_id = u32::from_le_bytes(page_buf);
364
365                // Read Length
366                let mut len_buf = [0u8; 4];
367                reader.read_exact(&mut len_buf)?;
368                running_crc = crc32_update(running_crc, &len_buf);
369                let len = u32::from_le_bytes(len_buf) as usize;
370
371                // Read Data
372                let mut data = vec![0u8; len];
373                reader.read_exact(&mut data)?;
374                running_crc = crc32_update(running_crc, &data);
375
376                WalRecord::PageWrite {
377                    tx_id,
378                    page_id,
379                    data,
380                }
381            }
382            RecordType::PageWriteCompressed => {
383                // Read TxID
384                let mut tx_buf = [0u8; 8];
385                reader.read_exact(&mut tx_buf)?;
386                running_crc = crc32_update(running_crc, &tx_buf);
387                let tx_id = u64::from_le_bytes(tx_buf);
388
389                // Read PageID
390                let mut page_buf = [0u8; 4];
391                reader.read_exact(&mut page_buf)?;
392                running_crc = crc32_update(running_crc, &page_buf);
393                let page_id = u32::from_le_bytes(page_buf);
394
395                // Read Compression algorithm byte
396                let mut comp_buf = [0u8; 1];
397                reader.read_exact(&mut comp_buf)?;
398                running_crc = crc32_update(running_crc, &comp_buf);
399                let compression = Compression::from_u8(comp_buf[0]).ok_or_else(|| {
400                    io::Error::new(
401                        io::ErrorKind::InvalidData,
402                        format!("Unknown WAL compression algorithm: {}", comp_buf[0]),
403                    )
404                })?;
405
406                // Read original (uncompressed) length — used to pre-allocate decompression buffer
407                let mut orig_len_buf = [0u8; 4];
408                reader.read_exact(&mut orig_len_buf)?;
409                running_crc = crc32_update(running_crc, &orig_len_buf);
410                let orig_len = u32::from_le_bytes(orig_len_buf) as usize;
411
412                // Read compressed data length
413                let mut len_buf = [0u8; 4];
414                reader.read_exact(&mut len_buf)?;
415                running_crc = crc32_update(running_crc, &len_buf);
416                let len = u32::from_le_bytes(len_buf) as usize;
417
418                // Read compressed data
419                let mut compressed = vec![0u8; len];
420                reader.read_exact(&mut compressed)?;
421                running_crc = crc32_update(running_crc, &compressed);
422
423                // Decompress
424                let data = match compression {
425                    Compression::Zstd => {
426                        let mut out = vec![0u8; orig_len];
427                        zstd::bulk::decompress_to_buffer(&compressed, &mut out).map_err(|e| {
428                            io::Error::new(
429                                io::ErrorKind::InvalidData,
430                                format!("WAL zstd decompress failed: {e}"),
431                            )
432                        })?;
433                        out
434                    }
435                    Compression::None => compressed,
436                };
437
438                WalRecord::PageWrite {
439                    tx_id,
440                    page_id,
441                    data,
442                }
443            }
444            RecordType::TxCommitBatch => {
445                let mut tx_buf = [0u8; 8];
446                reader.read_exact(&mut tx_buf)?;
447                running_crc = crc32_update(running_crc, &tx_buf);
448                let tx_id = u64::from_le_bytes(tx_buf);
449
450                let mut count_buf = [0u8; 4];
451                reader.read_exact(&mut count_buf)?;
452                running_crc = crc32_update(running_crc, &count_buf);
453                let count = u32::from_le_bytes(count_buf) as usize;
454
455                let mut actions = Vec::with_capacity(count);
456                for _ in 0..count {
457                    let mut len_buf = [0u8; 4];
458                    reader.read_exact(&mut len_buf)?;
459                    running_crc = crc32_update(running_crc, &len_buf);
460                    let len = u32::from_le_bytes(len_buf) as usize;
461
462                    let mut action = vec![0u8; len];
463                    reader.read_exact(&mut action)?;
464                    running_crc = crc32_update(running_crc, &action);
465                    actions.push(action);
466                }
467
468                WalRecord::TxCommitBatch { tx_id, actions }
469            }
470            RecordType::VectorInsert => {
471                let mut len_buf = [0u8; 4];
472                reader.read_exact(&mut len_buf)?;
473                running_crc = crc32_update(running_crc, &len_buf);
474                let collection_len = u32::from_le_bytes(len_buf) as usize;
475
476                let mut collection_buf = vec![0u8; collection_len];
477                reader.read_exact(&mut collection_buf)?;
478                running_crc = crc32_update(running_crc, &collection_buf);
479                let collection = String::from_utf8(collection_buf).map_err(|err| {
480                    io::Error::new(
481                        io::ErrorKind::InvalidData,
482                        format!("invalid collection utf8: {err}"),
483                    )
484                })?;
485
486                let mut entity_buf = [0u8; 8];
487                reader.read_exact(&mut entity_buf)?;
488                running_crc = crc32_update(running_crc, &entity_buf);
489                let entity_id = u64::from_le_bytes(entity_buf);
490
491                let mut count_buf = [0u8; 4];
492                reader.read_exact(&mut count_buf)?;
493                running_crc = crc32_update(running_crc, &count_buf);
494                let count = u32::from_le_bytes(count_buf) as usize;
495
496                let mut vector = Vec::with_capacity(count);
497                for _ in 0..count {
498                    let mut value_buf = [0u8; 4];
499                    reader.read_exact(&mut value_buf)?;
500                    running_crc = crc32_update(running_crc, &value_buf);
501                    vector.push(f32::from_le_bytes(value_buf));
502                }
503
504                WalRecord::VectorInsert {
505                    collection,
506                    entity_id,
507                    vector,
508                }
509            }
510            RecordType::FullPageImage => {
511                let mut tx_buf = [0u8; 8];
512                reader.read_exact(&mut tx_buf)?;
513                running_crc = crc32_update(running_crc, &tx_buf);
514                let tx_id = u64::from_le_bytes(tx_buf);
515
516                let mut page_buf = [0u8; 4];
517                reader.read_exact(&mut page_buf)?;
518                running_crc = crc32_update(running_crc, &page_buf);
519                let page_id = u32::from_le_bytes(page_buf);
520
521                let mut epoch_buf = [0u8; 8];
522                reader.read_exact(&mut epoch_buf)?;
523                running_crc = crc32_update(running_crc, &epoch_buf);
524                let ckpt_epoch = u64::from_le_bytes(epoch_buf);
525
526                let mut len_buf = [0u8; 4];
527                reader.read_exact(&mut len_buf)?;
528                running_crc = crc32_update(running_crc, &len_buf);
529                let len = u32::from_le_bytes(len_buf) as usize;
530
531                let mut data = vec![0u8; len];
532                reader.read_exact(&mut data)?;
533                running_crc = crc32_update(running_crc, &data);
534
535                WalRecord::FullPageImage {
536                    tx_id,
537                    page_id,
538                    ckpt_epoch,
539                    data,
540                }
541            }
542            RecordType::Checkpoint => {
543                let mut buf = [0u8; 8];
544                reader.read_exact(&mut buf)?;
545                running_crc = crc32_update(running_crc, &buf);
546                let lsn = u64::from_le_bytes(buf);
547                WalRecord::Checkpoint { lsn }
548            }
549        };
550
551        // Verify checksum
552        let mut crc_buf = [0u8; 4];
553        reader.read_exact(&mut crc_buf)?;
554        let stored_crc = u32::from_le_bytes(crc_buf);
555
556        if running_crc != stored_crc {
557            return Err(io::Error::new(
558                io::ErrorKind::InvalidData,
559                "WAL record checksum mismatch",
560            ));
561        }
562
563        Ok(Some((term, record)))
564    }
565}
566
567#[cfg(test)]
568mod tests {
569    use super::*;
570    use std::io::Cursor;
571
572    // ==================== RecordType Tests ====================
573
574    #[test]
575    fn test_record_type_from_u8() {
576        assert_eq!(RecordType::from_u8(1), Some(RecordType::Begin));
577        assert_eq!(RecordType::from_u8(2), Some(RecordType::Commit));
578        assert_eq!(RecordType::from_u8(3), Some(RecordType::Rollback));
579        assert_eq!(RecordType::from_u8(4), Some(RecordType::PageWrite));
580        assert_eq!(RecordType::from_u8(5), Some(RecordType::Checkpoint));
581        assert_eq!(
582            RecordType::from_u8(6),
583            Some(RecordType::PageWriteCompressed)
584        );
585        assert_eq!(RecordType::from_u8(7), Some(RecordType::TxCommitBatch));
586        assert_eq!(RecordType::from_u8(8), Some(RecordType::FullPageImage));
587        assert_eq!(RecordType::from_u8(9), Some(RecordType::VectorInsert));
588    }
589
590    #[test]
591    fn test_record_type_invalid() {
592        assert_eq!(RecordType::from_u8(0), None);
593        assert_eq!(RecordType::from_u8(10), None);
594        assert_eq!(RecordType::from_u8(255), None);
595    }
596
597    // ==================== WalRecord::encode Tests ====================
598
599    #[test]
600    fn test_encode_begin() {
601        let record = WalRecord::Begin { tx_id: 12345 };
602        let encoded = record.encode();
603
604        // Type (1) + Term (8) + TxID (8) + Checksum (4) = 21 bytes
605        assert_eq!(encoded.len(), 21);
606        assert_eq!(encoded[0], RecordType::Begin as u8);
607    }
608
609    #[test]
610    fn test_encode_commit() {
611        let record = WalRecord::Commit { tx_id: 99999 };
612        let encoded = record.encode();
613
614        assert_eq!(encoded.len(), 21);
615        assert_eq!(encoded[0], RecordType::Commit as u8);
616    }
617
618    #[test]
619    fn test_encode_rollback() {
620        let record = WalRecord::Rollback { tx_id: 54321 };
621        let encoded = record.encode();
622
623        assert_eq!(encoded.len(), 21);
624        assert_eq!(encoded[0], RecordType::Rollback as u8);
625    }
626
627    #[test]
628    fn test_encode_checkpoint() {
629        let record = WalRecord::Checkpoint { lsn: 1000000 };
630        let encoded = record.encode();
631
632        assert_eq!(encoded.len(), 21);
633        assert_eq!(encoded[0], RecordType::Checkpoint as u8);
634    }
635
636    #[test]
637    fn test_encode_page_write_small() {
638        // Small data (< COMPRESS_THRESHOLD) stays uncompressed.
639        let data = vec![1, 2, 3, 4, 5];
640        let record = WalRecord::PageWrite {
641            tx_id: 100,
642            page_id: 42,
643            data: data.clone(),
644        };
645        let encoded = record.encode();
646
647        // Type (1) + Term (8) + TxID (8) + PageID (4) + Len (4) + Data (5) + Checksum (4) = 34 bytes
648        assert_eq!(encoded.len(), 34);
649        assert_eq!(encoded[0], RecordType::PageWrite as u8);
650    }
651
652    #[test]
653    fn test_encode_page_write_empty_data() {
654        let record = WalRecord::PageWrite {
655            tx_id: 1,
656            page_id: 0,
657            data: vec![],
658        };
659        let encoded = record.encode();
660
661        // Type (1) + Term (8) + TxID (8) + PageID (4) + Len (4) + Checksum (4) = 29 bytes
662        assert_eq!(encoded.len(), 29);
663    }
664
665    #[test]
666    fn test_encode_tx_commit_batch() {
667        let record = WalRecord::TxCommitBatch {
668            tx_id: 7,
669            actions: vec![b"insert".to_vec(), b"update".to_vec()],
670        };
671        let encoded = record.encode();
672
673        assert_eq!(encoded[0], RecordType::TxCommitBatch as u8);
674    }
675
676    // ==================== WalRecord::read Tests ====================
677
678    #[test]
679    fn test_read_begin_roundtrip() {
680        let original = WalRecord::Begin { tx_id: 42 };
681        let encoded = original.encode();
682
683        let mut cursor = Cursor::new(encoded);
684        let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
685
686        assert_eq!(decoded, original);
687    }
688
689    #[test]
690    fn test_read_begin_roundtrip_preserves_term() {
691        let original = WalRecord::Begin { tx_id: 42 };
692        let encoded = original.encode_with_term(9);
693
694        let mut cursor = Cursor::new(encoded);
695        let (term, decoded) = WalRecord::read_with_term(&mut cursor).unwrap().unwrap();
696
697        assert_eq!(term, 9);
698        assert_eq!(decoded, original);
699    }
700
701    #[test]
702    fn test_read_v2_begin_defaults_term() {
703        let tx_id = 42u64;
704        let mut encoded = Vec::new();
705        encoded.push(RecordType::Begin as u8);
706        encoded.extend_from_slice(&tx_id.to_le_bytes());
707        let checksum = crc32(&encoded);
708        encoded.extend_from_slice(&checksum.to_le_bytes());
709
710        let mut cursor = Cursor::new(encoded);
711        let (term, decoded) = WalRecord::read_with_format_version(&mut cursor, WAL_VERSION_V2)
712            .unwrap()
713            .unwrap();
714
715        assert_eq!(term, WAL_DEFAULT_TERM);
716        assert_eq!(decoded, WalRecord::Begin { tx_id });
717    }
718
719    #[test]
720    fn test_read_commit_roundtrip() {
721        let original = WalRecord::Commit { tx_id: 999 };
722        let encoded = original.encode();
723
724        let mut cursor = Cursor::new(encoded);
725        let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
726
727        assert_eq!(decoded, original);
728    }
729
730    #[test]
731    fn test_read_rollback_roundtrip() {
732        let original = WalRecord::Rollback { tx_id: 777 };
733        let encoded = original.encode();
734
735        let mut cursor = Cursor::new(encoded);
736        let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
737
738        assert_eq!(decoded, original);
739    }
740
741    #[test]
742    fn test_read_checkpoint_roundtrip() {
743        let original = WalRecord::Checkpoint { lsn: 123456789 };
744        let encoded = original.encode();
745
746        let mut cursor = Cursor::new(encoded);
747        let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
748
749        assert_eq!(decoded, original);
750    }
751
752    #[test]
753    fn test_read_page_write_roundtrip() {
754        let original = WalRecord::PageWrite {
755            tx_id: 50,
756            page_id: 100,
757            data: vec![10, 20, 30, 40, 50, 60, 70, 80],
758        };
759        let encoded = original.encode();
760
761        let mut cursor = Cursor::new(encoded);
762        let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
763
764        assert_eq!(decoded, original);
765    }
766
767    #[test]
768    fn test_read_tx_commit_batch_roundtrip() {
769        let original = WalRecord::TxCommitBatch {
770            tx_id: 42,
771            actions: vec![b"old-version".to_vec(), b"new-version".to_vec()],
772        };
773        let encoded = original.encode();
774
775        let mut cursor = Cursor::new(encoded);
776        let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
777
778        assert_eq!(decoded, original);
779    }
780
781    #[test]
782    fn test_vector_insert_roundtrip() {
783        let original = WalRecord::VectorInsert {
784            collection: "turbo".to_string(),
785            entity_id: 42,
786            vector: vec![1.0, -0.5, 0.25],
787        };
788        let encoded = original.encode();
789
790        let mut cursor = Cursor::new(encoded);
791        let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
792
793        assert_eq!(decoded, original);
794    }
795
796    #[test]
797    fn test_read_page_write_large_data() {
798        // Large enough to trigger compression.
799        let data: Vec<u8> = (0..4096).map(|i| (i % 256) as u8).collect();
800        let original = WalRecord::PageWrite {
801            tx_id: 1,
802            page_id: 0,
803            data,
804        };
805        let encoded = original.encode();
806
807        let mut cursor = Cursor::new(encoded);
808        let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
809
810        // Round-trip: decoded data matches original (even if encoded differently).
811        assert_eq!(decoded, original);
812    }
813
814    #[test]
815    fn page_write_compressed_roundtrip() {
816        // Highly compressible payload: 1 KiB of repeated bytes.
817        let data = vec![0xABu8; 1024];
818        let record = WalRecord::PageWrite {
819            tx_id: 7,
820            page_id: 3,
821            data: data.clone(),
822        };
823        let encoded = record.encode();
824
825        // Should be stored as PageWriteCompressed (compressible > threshold).
826        assert_eq!(encoded[0], RecordType::PageWriteCompressed as u8);
827
828        // And round-trip decoding recovers original.
829        let mut cursor = Cursor::new(encoded);
830        let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
831        assert_eq!(
832            decoded,
833            WalRecord::PageWrite {
834                tx_id: 7,
835                page_id: 3,
836                data
837            }
838        );
839    }
840
841    #[test]
842    fn full_page_image_roundtrip() {
843        let data: Vec<u8> = (0..4096).map(|i| (i % 251) as u8).collect();
844        let original = WalRecord::FullPageImage {
845            tx_id: 11,
846            page_id: 9,
847            ckpt_epoch: 42,
848            data: data.clone(),
849        };
850        let encoded = original.encode();
851        assert_eq!(encoded[0], RecordType::FullPageImage as u8);
852
853        let mut cursor = Cursor::new(encoded);
854        let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
855        assert_eq!(decoded, original);
856    }
857
858    #[test]
859    fn full_page_image_checksum_mismatch_detected() {
860        let original = WalRecord::FullPageImage {
861            tx_id: 1,
862            page_id: 2,
863            ckpt_epoch: 3,
864            data: vec![0xAA; 32],
865        };
866        let mut encoded = original.encode();
867        let mid = encoded.len() / 2;
868        encoded[mid] ^= 0xFF;
869        let mut cursor = Cursor::new(encoded);
870        assert!(WalRecord::read(&mut cursor).is_err());
871    }
872
873    #[test]
874    fn test_read_eof() {
875        let mut cursor = Cursor::new(Vec::<u8>::new());
876        let result = WalRecord::read(&mut cursor).unwrap();
877        assert!(result.is_none());
878    }
879
880    #[test]
881    fn test_read_invalid_record_type() {
882        let buf = vec![99, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]; // Invalid type 99
883        let mut cursor = Cursor::new(buf);
884        let result = WalRecord::read(&mut cursor);
885        assert!(result.is_err());
886    }
887
888    #[test]
889    fn test_read_checksum_mismatch() {
890        let record = WalRecord::Begin { tx_id: 42 };
891        let mut encoded = record.encode();
892
893        // Corrupt the last byte (checksum)
894        let len = encoded.len();
895        encoded[len - 1] ^= 0xFF;
896
897        let mut cursor = Cursor::new(encoded);
898        let result = WalRecord::read(&mut cursor);
899        assert!(result.is_err());
900    }
901
902    #[test]
903    fn test_read_data_corruption() {
904        let record = WalRecord::PageWrite {
905            tx_id: 1,
906            page_id: 2,
907            data: vec![1, 2, 3, 4],
908        };
909        let mut encoded = record.encode();
910
911        // Corrupt a data byte
912        encoded[15] ^= 0xFF;
913
914        let mut cursor = Cursor::new(encoded);
915        let result = WalRecord::read(&mut cursor);
916        assert!(result.is_err()); // Checksum will fail
917    }
918
919    // ==================== Multiple Records Tests ====================
920
921    #[test]
922    fn test_multiple_records_sequential() {
923        let records = vec![
924            WalRecord::Begin { tx_id: 1 },
925            WalRecord::PageWrite {
926                tx_id: 1,
927                page_id: 10,
928                data: vec![1, 2, 3],
929            },
930            WalRecord::PageWrite {
931                tx_id: 1,
932                page_id: 20,
933                data: vec![4, 5, 6],
934            },
935            WalRecord::Commit { tx_id: 1 },
936            WalRecord::Checkpoint { lsn: 100 },
937        ];
938
939        // Encode all
940        let mut buf = Vec::new();
941        for r in &records {
942            buf.extend_from_slice(&r.encode());
943        }
944
945        // Read them back
946        let mut cursor = Cursor::new(buf);
947        for expected in &records {
948            let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
949            assert_eq!(&decoded, expected);
950        }
951
952        // Next read should return None (EOF)
953        assert!(WalRecord::read(&mut cursor).unwrap().is_none());
954    }
955
956    // ==================== Constants Tests ====================
957
958    // ==================== encode_into scratch-buffer Tests ====================
959
960    /// `encode_into` appended to a fresh scratch must be byte-identical to the
961    /// per-allocation `encode()` baseline, for every record variant.
962    #[test]
963    fn test_encode_into_matches_encode_for_all_variants() {
964        let records = vec![
965            WalRecord::Begin { tx_id: 12345 },
966            WalRecord::Commit { tx_id: 99999 },
967            WalRecord::Rollback { tx_id: 54321 },
968            WalRecord::Checkpoint { lsn: 1_000_000 },
969            WalRecord::PageWrite {
970                tx_id: 100,
971                page_id: 42,
972                data: vec![1, 2, 3, 4, 5],
973            },
974            // Large, highly compressible payload → exercises the
975            // PageWriteCompressed early-return branch.
976            WalRecord::PageWrite {
977                tx_id: 7,
978                page_id: 3,
979                data: vec![0xABu8; 1024],
980            },
981            WalRecord::TxCommitBatch {
982                tx_id: 7,
983                actions: vec![b"insert".to_vec(), b"update".to_vec()],
984            },
985            WalRecord::FullPageImage {
986                tx_id: 11,
987                page_id: 9,
988                ckpt_epoch: 42,
989                data: (0..4096).map(|i| (i % 251) as u8).collect(),
990            },
991            WalRecord::VectorInsert {
992                collection: "turbo".to_string(),
993                entity_id: 42,
994                vector: vec![1.0, -0.5, 0.25],
995            },
996        ];
997
998        for record in &records {
999            let baseline = record.encode();
1000            let mut scratch = Vec::new();
1001            record.encode_into(&mut scratch);
1002            assert_eq!(scratch, baseline, "encode_into mismatch for {record:?}");
1003        }
1004    }
1005
1006    /// Reusing one scratch buffer across several records yields exactly the
1007    /// concatenation of the per-record `encode()` baselines — proving the
1008    /// checksum is computed over each record's own slice, not the whole buffer.
1009    #[test]
1010    fn test_encode_into_reuses_scratch_across_records() {
1011        let records = vec![
1012            WalRecord::Begin { tx_id: 1 },
1013            WalRecord::PageWrite {
1014                tx_id: 1,
1015                page_id: 10,
1016                data: vec![1, 2, 3],
1017            },
1018            WalRecord::Commit { tx_id: 1 },
1019        ];
1020
1021        let mut expected = Vec::new();
1022        for r in &records {
1023            expected.extend_from_slice(&r.encode());
1024        }
1025
1026        // One scratch, reused for every record — no per-record allocation.
1027        let mut scratch = Vec::new();
1028        for r in &records {
1029            r.encode_into(&mut scratch);
1030        }
1031
1032        assert_eq!(scratch, expected);
1033
1034        // And the concatenation round-trips back to the original records.
1035        let mut cursor = Cursor::new(scratch);
1036        for expected in &records {
1037            let decoded = WalRecord::read(&mut cursor).unwrap().unwrap();
1038            assert_eq!(&decoded, expected);
1039        }
1040        assert!(WalRecord::read(&mut cursor).unwrap().is_none());
1041    }
1042
1043    /// `encode_with_term_into` honours the term and matches the allocating
1044    /// `encode_with_term` baseline even when appended after existing bytes.
1045    #[test]
1046    fn test_encode_with_term_into_matches_and_preserves_prefix() {
1047        let prefix = b"PREFIX-BYTES".to_vec();
1048        let record = WalRecord::Begin { tx_id: 42 };
1049
1050        let mut scratch = prefix.clone();
1051        record.encode_with_term_into(&mut scratch, 9);
1052
1053        // Prefix untouched; suffix equals the allocating baseline.
1054        assert_eq!(&scratch[..prefix.len()], &prefix[..]);
1055        assert_eq!(&scratch[prefix.len()..], &record.encode_with_term(9)[..]);
1056    }
1057
1058    #[test]
1059    fn test_wal_magic() {
1060        assert_eq!(WAL_MAGIC, b"RDBW");
1061    }
1062
1063    #[test]
1064    fn test_wal_version() {
1065        assert_eq!(WAL_VERSION, 3);
1066    }
1067}