Skip to main content

sochdb_storage/
zero_copy_serde.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Zero-Copy Serialization (Recommendation 6)
19//!
20//! ## Problem
21//!
22//! Traditional serialization (bincode/serde) requires:
23//! 1. Serialize: Marshal data into bytes
24//! 2. Write: Write bytes to WAL
25//! 3. Read: Read bytes from WAL
26//! 4. Deserialize: Unmarshal bytes back to structs
27//!
28//! Deserialization is expensive:
29//! ```text
30//! bincode::deserialize<WalEntry>:
31//!   - Parse header: ~50ns
32//!   - Allocate strings: ~200ns (heap allocation)
33//!   - Copy data: ~100ns
34//!   - Total: ~350ns per entry
35//! ```
36//!
37//! ## Solution
38//!
39//! Zero-copy serialization where the serialized format IS the in-memory format.
40//! Data can be accessed directly from memory-mapped files without deserialization.
41//!
42//! ```text
43//! Zero-copy read:
44//!   - Validate header: ~10ns
45//!   - Return reference: ~5ns
46//!   - Total: ~15ns per entry (23x faster)
47//! ```
48//!
49//! ## Design
50//!
51//! Fixed-size header followed by variable-length data:
52//! ```text
53//! ┌─────────────────────────────────────────────────────────────┐
54//! │ Magic (4B) │ Version (2B) │ Flags (2B) │ Length (4B) │ CRC │
55//! ├─────────────────────────────────────────────────────────────┤
56//! │                    Fixed-size fields                        │
57//! ├─────────────────────────────────────────────────────────────┤
58//! │ Offset table (variable-length field offsets)                │
59//! ├─────────────────────────────────────────────────────────────┤
60//! │                    Variable-length data                     │
61//! └─────────────────────────────────────────────────────────────┘
62//! ```
63
64use std::mem::size_of;
65
66/// Magic number for zero-copy format
67pub const ZERO_COPY_MAGIC: u32 = 0x5A43_4F50; // "ZCOP"
68
69/// Current format version
70pub const FORMAT_VERSION: u16 = 1;
71
72/// Header size in bytes
73pub const HEADER_SIZE: usize = 16;
74
75// =============================================================================
76// Zero-Copy Header
77// =============================================================================
78
79/// Header for zero-copy serialized data
80#[repr(C, packed)]
81#[derive(Debug, Clone, Copy)]
82pub struct ZeroCopyHeader {
83    /// Magic number for format validation
84    pub magic: u32,
85    /// Format version
86    pub version: u16,
87    /// Flags (compression, etc.)
88    pub flags: u16,
89    /// Total length including header
90    pub total_length: u32,
91    /// CRC32 of data (excluding header)
92    pub crc: u32,
93}
94
95impl ZeroCopyHeader {
96    /// Create new header
97    pub fn new(data_length: usize, flags: u16, crc: u32) -> Self {
98        Self {
99            magic: ZERO_COPY_MAGIC,
100            version: FORMAT_VERSION,
101            flags,
102            total_length: (HEADER_SIZE + data_length) as u32,
103            crc,
104        }
105    }
106
107    /// Validate header
108    #[inline]
109    pub fn validate(&self) -> bool {
110        self.magic == ZERO_COPY_MAGIC && self.version <= FORMAT_VERSION
111    }
112
113    /// Write header to buffer
114    pub fn write_to(&self, buf: &mut [u8]) {
115        assert!(buf.len() >= HEADER_SIZE);
116        buf[0..4].copy_from_slice(&self.magic.to_le_bytes());
117        buf[4..6].copy_from_slice(&self.version.to_le_bytes());
118        buf[6..8].copy_from_slice(&self.flags.to_le_bytes());
119        buf[8..12].copy_from_slice(&self.total_length.to_le_bytes());
120        buf[12..16].copy_from_slice(&self.crc.to_le_bytes());
121    }
122
123    /// Read header from buffer
124    pub fn read_from(buf: &[u8]) -> Option<Self> {
125        if buf.len() < HEADER_SIZE {
126            return None;
127        }
128        Some(Self {
129            magic: u32::from_le_bytes(buf[0..4].try_into().ok()?),
130            version: u16::from_le_bytes(buf[4..6].try_into().ok()?),
131            flags: u16::from_le_bytes(buf[6..8].try_into().ok()?),
132            total_length: u32::from_le_bytes(buf[8..12].try_into().ok()?),
133            crc: u32::from_le_bytes(buf[12..16].try_into().ok()?),
134        })
135    }
136}
137
138// =============================================================================
139// Zero-Copy WAL Entry
140// =============================================================================
141
142/// WAL entry type
143#[repr(u8)]
144#[derive(Debug, Clone, Copy, PartialEq, Eq)]
145pub enum WalEntryType {
146    /// Insert operation
147    Insert = 1,
148    /// Update operation
149    Update = 2,
150    /// Delete operation
151    Delete = 3,
152    /// Begin transaction
153    BeginTxn = 4,
154    /// Commit transaction
155    CommitTxn = 5,
156    /// Abort transaction
157    AbortTxn = 6,
158    /// Checkpoint marker
159    Checkpoint = 7,
160}
161
162impl WalEntryType {
163    pub fn from_u8(v: u8) -> Option<Self> {
164        match v {
165            1 => Some(Self::Insert),
166            2 => Some(Self::Update),
167            3 => Some(Self::Delete),
168            4 => Some(Self::BeginTxn),
169            5 => Some(Self::CommitTxn),
170            6 => Some(Self::AbortTxn),
171            7 => Some(Self::Checkpoint),
172            _ => None,
173        }
174    }
175}
176
177/// Fixed-size WAL entry header (32 bytes)
178#[repr(C)]
179#[derive(Debug, Clone, Copy)]
180pub struct WalEntryHeader {
181    /// Transaction ID
182    pub txn_id: u64,
183    /// Log sequence number
184    pub lsn: u64,
185    /// Timestamp (nanoseconds since epoch)
186    pub timestamp: u64,
187    /// Entry type
188    pub entry_type: u8,
189    /// Number of variable-length fields
190    pub field_count: u8,
191    /// Reserved for alignment
192    pub _reserved: [u8; 6],
193}
194
195pub const WAL_ENTRY_HEADER_SIZE: usize = size_of::<WalEntryHeader>();
196
197impl WalEntryHeader {
198    pub fn new(txn_id: u64, lsn: u64, entry_type: WalEntryType, field_count: u8) -> Self {
199        Self {
200            txn_id,
201            lsn,
202            timestamp: std::time::SystemTime::now()
203                .duration_since(std::time::UNIX_EPOCH)
204                .map(|d| d.as_nanos() as u64)
205                .unwrap_or(0),
206            entry_type: entry_type as u8,
207            field_count,
208            _reserved: [0; 6],
209        }
210    }
211
212    /// Write to buffer
213    pub fn write_to(&self, buf: &mut [u8]) {
214        assert!(buf.len() >= WAL_ENTRY_HEADER_SIZE);
215        buf[0..8].copy_from_slice(&self.txn_id.to_le_bytes());
216        buf[8..16].copy_from_slice(&self.lsn.to_le_bytes());
217        buf[16..24].copy_from_slice(&self.timestamp.to_le_bytes());
218        buf[24] = self.entry_type;
219        buf[25] = self.field_count;
220        buf[26..32].copy_from_slice(&self._reserved);
221    }
222
223    /// Read from buffer (zero-copy)
224    #[inline]
225    pub fn read_from(buf: &[u8]) -> Option<&Self> {
226        if buf.len() < WAL_ENTRY_HEADER_SIZE {
227            return None;
228        }
229        // Safety: We've verified length and WalEntryHeader is repr(C)
230        // This is the zero-copy read - no deserialization!
231        unsafe {
232            let ptr = buf.as_ptr() as *const Self;
233            // Verify alignment
234            if ptr as usize % std::mem::align_of::<Self>() != 0 {
235                return None;
236            }
237            Some(&*ptr)
238        }
239    }
240
241    /// Read from buffer (safe copy version for unaligned data)
242    pub fn read_from_copy(buf: &[u8]) -> Option<Self> {
243        if buf.len() < WAL_ENTRY_HEADER_SIZE {
244            return None;
245        }
246        Some(Self {
247            txn_id: u64::from_le_bytes(buf[0..8].try_into().ok()?),
248            lsn: u64::from_le_bytes(buf[8..16].try_into().ok()?),
249            timestamp: u64::from_le_bytes(buf[16..24].try_into().ok()?),
250            entry_type: buf[24],
251            field_count: buf[25],
252            _reserved: buf[26..32].try_into().ok()?,
253        })
254    }
255}
256
257// =============================================================================
258// Zero-Copy WAL Entry Builder
259// =============================================================================
260
261/// Field descriptor for variable-length fields
262#[repr(C)]
263#[derive(Debug, Clone, Copy)]
264pub struct FieldDescriptor {
265    /// Offset from start of data section
266    pub offset: u32,
267    /// Length of field
268    pub length: u32,
269}
270
271pub const FIELD_DESCRIPTOR_SIZE: usize = size_of::<FieldDescriptor>();
272
273/// Builder for zero-copy WAL entries
274pub struct WalEntryBuilder {
275    /// Fixed header
276    header: WalEntryHeader,
277    /// Field descriptors
278    fields: Vec<FieldDescriptor>,
279    /// Variable-length data
280    data: Vec<u8>,
281}
282
283impl WalEntryBuilder {
284    /// Create new builder
285    pub fn new(txn_id: u64, lsn: u64, entry_type: WalEntryType) -> Self {
286        Self {
287            header: WalEntryHeader::new(txn_id, lsn, entry_type, 0),
288            fields: Vec::new(),
289            data: Vec::new(),
290        }
291    }
292
293    /// Add a variable-length field
294    pub fn add_field(&mut self, data: &[u8]) -> &mut Self {
295        let offset = self.data.len() as u32;
296        let length = data.len() as u32;
297        self.fields.push(FieldDescriptor { offset, length });
298        self.data.extend_from_slice(data);
299        self.header.field_count = self.fields.len() as u8;
300        self
301    }
302
303    /// Add key field
304    pub fn with_key(&mut self, key: &[u8]) -> &mut Self {
305        self.add_field(key)
306    }
307
308    /// Add value field
309    pub fn with_value(&mut self, value: &[u8]) -> &mut Self {
310        self.add_field(value)
311    }
312
313    /// Calculate total size
314    pub fn total_size(&self) -> usize {
315        HEADER_SIZE
316            + WAL_ENTRY_HEADER_SIZE
317            + self.fields.len() * FIELD_DESCRIPTOR_SIZE
318            + self.data.len()
319    }
320
321    /// Build into bytes
322    pub fn build(&self) -> Vec<u8> {
323        let data_len =
324            WAL_ENTRY_HEADER_SIZE + self.fields.len() * FIELD_DESCRIPTOR_SIZE + self.data.len();
325
326        let mut buf = vec![0u8; HEADER_SIZE + data_len];
327
328        // Calculate CRC of data portion
329        let crc = crc32fast::hash(&buf[HEADER_SIZE..]);
330
331        // Write header
332        let header = ZeroCopyHeader::new(data_len, 0, crc);
333        header.write_to(&mut buf[0..HEADER_SIZE]);
334
335        // Write WAL entry header
336        let offset = HEADER_SIZE;
337        self.header
338            .write_to(&mut buf[offset..offset + WAL_ENTRY_HEADER_SIZE]);
339
340        // Write field descriptors
341        let mut offset = HEADER_SIZE + WAL_ENTRY_HEADER_SIZE;
342        for field in &self.fields {
343            buf[offset..offset + 4].copy_from_slice(&field.offset.to_le_bytes());
344            buf[offset + 4..offset + 8].copy_from_slice(&field.length.to_le_bytes());
345            offset += FIELD_DESCRIPTOR_SIZE;
346        }
347
348        // Write data
349        buf[offset..].copy_from_slice(&self.data);
350
351        // Update CRC
352        let crc = crc32fast::hash(&buf[HEADER_SIZE..]);
353        buf[12..16].copy_from_slice(&crc.to_le_bytes());
354
355        buf
356    }
357}
358
359// =============================================================================
360// Zero-Copy WAL Entry Reader
361// =============================================================================
362
363/// Zero-copy WAL entry reader
364///
365/// Provides direct access to WAL entry fields without deserialization.
366/// The backing data must remain valid for the lifetime of this reader.
367pub struct WalEntryReader<'a> {
368    /// Raw backing data
369    data: &'a [u8],
370    /// Parsed header
371    header: &'a WalEntryHeader,
372    /// Field count
373    field_count: usize,
374    /// Offset to field descriptors
375    fields_offset: usize,
376    /// Offset to data section
377    data_offset: usize,
378}
379
380impl<'a> WalEntryReader<'a> {
381    /// Create reader from raw bytes (zero-copy)
382    pub fn from_bytes(bytes: &'a [u8]) -> Option<Self> {
383        // Validate outer header
384        let outer_header = ZeroCopyHeader::read_from(bytes)?;
385        if !outer_header.validate() {
386            return None;
387        }
388
389        // Validate CRC
390        let expected_crc = outer_header.crc;
391        let actual_crc = crc32fast::hash(&bytes[HEADER_SIZE..]);
392        if expected_crc != actual_crc {
393            return None;
394        }
395
396        // Read WAL entry header (zero-copy if aligned)
397        let entry_data = &bytes[HEADER_SIZE..];
398        let header = WalEntryHeader::read_from(entry_data)?;
399
400        let field_count = header.field_count as usize;
401        let fields_offset = WAL_ENTRY_HEADER_SIZE;
402        let data_offset = fields_offset + field_count * FIELD_DESCRIPTOR_SIZE;
403
404        Some(Self {
405            data: entry_data,
406            header,
407            field_count,
408            fields_offset,
409            data_offset,
410        })
411    }
412
413    /// Get transaction ID
414    #[inline]
415    pub fn txn_id(&self) -> u64 {
416        self.header.txn_id
417    }
418
419    /// Get LSN
420    #[inline]
421    pub fn lsn(&self) -> u64 {
422        self.header.lsn
423    }
424
425    /// Get timestamp
426    #[inline]
427    pub fn timestamp(&self) -> u64 {
428        self.header.timestamp
429    }
430
431    /// Get entry type
432    #[inline]
433    pub fn entry_type(&self) -> Option<WalEntryType> {
434        WalEntryType::from_u8(self.header.entry_type)
435    }
436
437    /// Get number of fields
438    #[inline]
439    pub fn field_count(&self) -> usize {
440        self.field_count
441    }
442
443    /// Get field by index (zero-copy)
444    #[inline]
445    pub fn get_field(&self, index: usize) -> Option<&'a [u8]> {
446        if index >= self.field_count {
447            return None;
448        }
449
450        let desc_offset = self.fields_offset + index * FIELD_DESCRIPTOR_SIZE;
451        let desc_bytes = self
452            .data
453            .get(desc_offset..desc_offset + FIELD_DESCRIPTOR_SIZE)?;
454
455        let offset = u32::from_le_bytes(desc_bytes[0..4].try_into().ok()?) as usize;
456        let length = u32::from_le_bytes(desc_bytes[4..8].try_into().ok()?) as usize;
457
458        let start = self.data_offset + offset;
459        self.data.get(start..start + length)
460    }
461
462    /// Get key field (first field by convention)
463    #[inline]
464    pub fn key(&self) -> Option<&'a [u8]> {
465        self.get_field(0)
466    }
467
468    /// Get value field (second field by convention)
469    #[inline]
470    pub fn value(&self) -> Option<&'a [u8]> {
471        self.get_field(1)
472    }
473
474    /// Iterate over all fields
475    pub fn fields(&self) -> impl Iterator<Item = &'a [u8]> + '_ {
476        (0..self.field_count).filter_map(|i| self.get_field(i))
477    }
478}
479
480// =============================================================================
481// Zero-Copy Batch Writer
482// =============================================================================
483
484/// Batch writer for multiple WAL entries
485///
486/// Optimized for group commit scenarios where multiple entries
487/// are written together.
488pub struct WalBatchWriter {
489    /// Accumulated entries
490    entries: Vec<Vec<u8>>,
491    /// Total size
492    total_size: usize,
493}
494
495impl WalBatchWriter {
496    pub fn new() -> Self {
497        Self {
498            entries: Vec::new(),
499            total_size: 0,
500        }
501    }
502
503    pub fn with_capacity(capacity: usize) -> Self {
504        Self {
505            entries: Vec::with_capacity(capacity),
506            total_size: 0,
507        }
508    }
509
510    /// Add entry to batch
511    pub fn add(&mut self, entry: WalEntryBuilder) {
512        let bytes = entry.build();
513        self.total_size += bytes.len();
514        self.entries.push(bytes);
515    }
516
517    /// Get number of entries
518    pub fn len(&self) -> usize {
519        self.entries.len()
520    }
521
522    /// Check if empty
523    pub fn is_empty(&self) -> bool {
524        self.entries.is_empty()
525    }
526
527    /// Get total size in bytes
528    pub fn total_size(&self) -> usize {
529        self.total_size
530    }
531
532    /// Build into single contiguous buffer
533    pub fn build(&self) -> Vec<u8> {
534        let mut buf = Vec::with_capacity(self.total_size + 8);
535
536        // Write entry count
537        buf.extend_from_slice(&(self.entries.len() as u32).to_le_bytes());
538        // Write total size
539        buf.extend_from_slice(&(self.total_size as u32).to_le_bytes());
540
541        // Write all entries
542        for entry in &self.entries {
543            buf.extend_from_slice(entry);
544        }
545
546        buf
547    }
548
549    /// Clear the batch
550    pub fn clear(&mut self) {
551        self.entries.clear();
552        self.total_size = 0;
553    }
554}
555
556impl Default for WalBatchWriter {
557    fn default() -> Self {
558        Self::new()
559    }
560}
561
562// =============================================================================
563// Zero-Copy Batch Reader
564// =============================================================================
565
566/// Batch reader for multiple WAL entries
567pub struct WalBatchReader<'a> {
568    data: &'a [u8],
569    entry_count: usize,
570    #[allow(dead_code)]
571    current_offset: usize,
572}
573
574impl<'a> WalBatchReader<'a> {
575    pub fn from_bytes(data: &'a [u8]) -> Option<Self> {
576        if data.len() < 8 {
577            return None;
578        }
579
580        let entry_count = u32::from_le_bytes(data[0..4].try_into().ok()?) as usize;
581        let _total_size = u32::from_le_bytes(data[4..8].try_into().ok()?) as usize;
582
583        Some(Self {
584            data,
585            entry_count,
586            current_offset: 8,
587        })
588    }
589
590    /// Get number of entries in batch
591    pub fn entry_count(&self) -> usize {
592        self.entry_count
593    }
594
595    /// Iterate over entries
596    pub fn entries(&self) -> WalBatchIter<'a> {
597        WalBatchIter {
598            data: self.data,
599            offset: 8,
600            remaining: self.entry_count,
601        }
602    }
603}
604
605/// Iterator over batch entries
606pub struct WalBatchIter<'a> {
607    data: &'a [u8],
608    offset: usize,
609    remaining: usize,
610}
611
612impl<'a> Iterator for WalBatchIter<'a> {
613    type Item = WalEntryReader<'a>;
614
615    fn next(&mut self) -> Option<Self::Item> {
616        if self.remaining == 0 {
617            return None;
618        }
619
620        let entry_data = &self.data[self.offset..];
621        let header = ZeroCopyHeader::read_from(entry_data)?;
622
623        let entry_len = header.total_length as usize;
624        let entry = WalEntryReader::from_bytes(&entry_data[..entry_len])?;
625
626        self.offset += entry_len;
627        self.remaining -= 1;
628
629        Some(entry)
630    }
631
632    fn size_hint(&self) -> (usize, Option<usize>) {
633        (self.remaining, Some(self.remaining))
634    }
635}
636
637impl<'a> ExactSizeIterator for WalBatchIter<'a> {}
638
639// =============================================================================
640// Memory-Mapped Zero-Copy Access
641// =============================================================================
642
643/// Memory-mapped WAL file for zero-copy access
644pub struct MmapWalReader {
645    /// Memory-mapped data
646    mmap: memmap2::Mmap,
647    /// File size
648    size: usize,
649}
650
651impl MmapWalReader {
652    /// Open WAL file for zero-copy reading
653    pub fn open(path: &std::path::Path) -> std::io::Result<Self> {
654        let file = std::fs::File::open(path)?;
655        let metadata = file.metadata()?;
656        let size = metadata.len() as usize;
657
658        // Safety: File is opened read-only
659        let mmap = unsafe { memmap2::Mmap::map(&file)? };
660
661        Ok(Self { mmap, size })
662    }
663
664    /// Get raw bytes
665    pub fn as_bytes(&self) -> &[u8] {
666        &self.mmap
667    }
668
669    /// Get file size
670    pub fn size(&self) -> usize {
671        self.size
672    }
673
674    /// Read entry at offset (zero-copy)
675    pub fn read_entry_at(&self, offset: usize) -> Option<WalEntryReader<'_>> {
676        if offset >= self.size {
677            return None;
678        }
679        WalEntryReader::from_bytes(&self.mmap[offset..])
680    }
681
682    /// Iterate over all entries
683    pub fn entries(&self) -> MmapWalIter<'_> {
684        MmapWalIter {
685            data: &self.mmap,
686            offset: 0,
687            size: self.size,
688        }
689    }
690}
691
692/// Iterator over memory-mapped WAL entries
693pub struct MmapWalIter<'a> {
694    data: &'a [u8],
695    offset: usize,
696    size: usize,
697}
698
699impl<'a> Iterator for MmapWalIter<'a> {
700    type Item = WalEntryReader<'a>;
701
702    fn next(&mut self) -> Option<Self::Item> {
703        if self.offset >= self.size {
704            return None;
705        }
706
707        let entry_data = &self.data[self.offset..];
708        if entry_data.len() < HEADER_SIZE {
709            return None;
710        }
711
712        let header = ZeroCopyHeader::read_from(entry_data)?;
713        if !header.validate() {
714            return None;
715        }
716
717        let entry_len = header.total_length as usize;
718        if self.offset + entry_len > self.size {
719            return None;
720        }
721
722        let entry = WalEntryReader::from_bytes(&entry_data[..entry_len])?;
723        self.offset += entry_len;
724
725        Some(entry)
726    }
727}
728
729// =============================================================================
730// Statistics
731// =============================================================================
732
733/// Serialization statistics
734#[derive(Debug, Default)]
735pub struct SerdeStats {
736    /// Total entries written
737    pub entries_written: u64,
738    /// Total bytes written
739    pub bytes_written: u64,
740    /// Total entries read
741    pub entries_read: u64,
742    /// Total bytes read (zero-copy, not actually copied)
743    pub bytes_read: u64,
744    /// CRC validation failures
745    pub crc_failures: u64,
746}
747
748impl SerdeStats {
749    pub fn new() -> Self {
750        Self::default()
751    }
752
753    pub fn record_write(&mut self, bytes: usize) {
754        self.entries_written += 1;
755        self.bytes_written += bytes as u64;
756    }
757
758    pub fn record_read(&mut self, bytes: usize) {
759        self.entries_read += 1;
760        self.bytes_read += bytes as u64;
761    }
762
763    pub fn record_crc_failure(&mut self) {
764        self.crc_failures += 1;
765    }
766}
767
768// =============================================================================
769// Tests
770// =============================================================================
771
772#[cfg(test)]
773mod tests {
774    use super::*;
775
776    #[test]
777    fn test_wal_entry_roundtrip() {
778        let mut builder = WalEntryBuilder::new(42, 100, WalEntryType::Insert);
779        builder.with_key(b"test_key").with_value(b"test_value");
780
781        let bytes = builder.build();
782        let reader = WalEntryReader::from_bytes(&bytes).unwrap();
783
784        assert_eq!(reader.txn_id(), 42);
785        assert_eq!(reader.lsn(), 100);
786        assert_eq!(reader.entry_type(), Some(WalEntryType::Insert));
787        assert_eq!(reader.field_count(), 2);
788        assert_eq!(reader.key(), Some(b"test_key".as_slice()));
789        assert_eq!(reader.value(), Some(b"test_value".as_slice()));
790    }
791
792    #[test]
793    fn test_wal_entry_zero_copy_header() {
794        let header = WalEntryHeader::new(123, 456, WalEntryType::Update, 3);
795        let mut buf = vec![0u8; WAL_ENTRY_HEADER_SIZE];
796        header.write_to(&mut buf);
797
798        // Test zero-copy read (if aligned)
799        if let Some(read_header) = WalEntryHeader::read_from(&buf) {
800            assert_eq!(read_header.txn_id, 123);
801            assert_eq!(read_header.lsn, 456);
802            assert_eq!(read_header.entry_type, WalEntryType::Update as u8);
803            assert_eq!(read_header.field_count, 3);
804        }
805
806        // Test copy read
807        let read_header = WalEntryHeader::read_from_copy(&buf).unwrap();
808        assert_eq!(read_header.txn_id, 123);
809        assert_eq!(read_header.lsn, 456);
810    }
811
812    #[test]
813    fn test_wal_entry_crc_validation() {
814        let mut builder = WalEntryBuilder::new(1, 1, WalEntryType::Insert);
815        builder.with_key(b"key");
816
817        let mut bytes = builder.build();
818
819        // Valid entry should parse
820        assert!(WalEntryReader::from_bytes(&bytes).is_some());
821
822        // Corrupt data
823        if bytes.len() > 20 {
824            bytes[20] ^= 0xFF;
825        }
826
827        // Corrupted entry should fail CRC
828        assert!(WalEntryReader::from_bytes(&bytes).is_none());
829    }
830
831    #[test]
832    fn test_batch_writer_reader() {
833        let mut batch = WalBatchWriter::new();
834
835        for i in 0..10 {
836            let mut entry = WalEntryBuilder::new(i, i * 10, WalEntryType::Insert);
837            entry.with_key(format!("key_{}", i).as_bytes());
838            entry.with_value(format!("value_{}", i).as_bytes());
839            batch.add(entry);
840        }
841
842        assert_eq!(batch.len(), 10);
843
844        let bytes = batch.build();
845        let reader = WalBatchReader::from_bytes(&bytes).unwrap();
846
847        assert_eq!(reader.entry_count(), 10);
848
849        for (i, entry) in reader.entries().enumerate() {
850            assert_eq!(entry.txn_id(), i as u64);
851            assert_eq!(entry.key(), Some(format!("key_{}", i).as_bytes()));
852        }
853    }
854
855    #[test]
856    fn test_multiple_fields() {
857        let mut builder = WalEntryBuilder::new(1, 1, WalEntryType::Update);
858        builder.add_field(b"field_0");
859        builder.add_field(b"field_1");
860        builder.add_field(b"field_2");
861        builder.add_field(b"field_3");
862
863        let bytes = builder.build();
864        let reader = WalEntryReader::from_bytes(&bytes).unwrap();
865
866        assert_eq!(reader.field_count(), 4);
867
868        let fields: Vec<_> = reader.fields().collect();
869        assert_eq!(fields.len(), 4);
870        assert_eq!(fields[0], b"field_0");
871        assert_eq!(fields[1], b"field_1");
872        assert_eq!(fields[2], b"field_2");
873        assert_eq!(fields[3], b"field_3");
874    }
875
876    #[test]
877    fn test_empty_fields() {
878        let builder = WalEntryBuilder::new(1, 1, WalEntryType::BeginTxn);
879
880        let bytes = builder.build();
881        let reader = WalEntryReader::from_bytes(&bytes).unwrap();
882
883        assert_eq!(reader.field_count(), 0);
884        assert_eq!(reader.entry_type(), Some(WalEntryType::BeginTxn));
885    }
886
887    #[test]
888    fn test_large_value() {
889        let large_value = vec![0xAB; 1024 * 1024]; // 1MB
890
891        let mut builder = WalEntryBuilder::new(1, 1, WalEntryType::Insert);
892        builder.with_key(b"large_key").with_value(&large_value);
893
894        let bytes = builder.build();
895        let reader = WalEntryReader::from_bytes(&bytes).unwrap();
896
897        assert_eq!(reader.value(), Some(large_value.as_slice()));
898    }
899
900    #[test]
901    fn test_header_validation() {
902        let header = ZeroCopyHeader::new(100, 0, 12345);
903        assert!(header.validate());
904
905        let mut bad_header = header;
906        bad_header.magic = 0xDEADBEEF;
907        assert!(!bad_header.validate());
908    }
909}