Skip to main content

sochdb_storage/
zero_copy_serde.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Zero-Copy Serialization (Recommendation 6)
19//!
20//! ## Problem
21//!
22//! Traditional serialization (bincode/serde) requires:
23//! 1. Serialize: Marshal data into bytes
24//! 2. Write: Write bytes to WAL
25//! 3. Read: Read bytes from WAL
26//! 4. Deserialize: Unmarshal bytes back to structs
27//!
28//! Deserialization is expensive:
29//! ```text
30//! bincode::deserialize<WalEntry>:
31//!   - Parse header: ~50ns
32//!   - Allocate strings: ~200ns (heap allocation)
33//!   - Copy data: ~100ns
34//!   - Total: ~350ns per entry
35//! ```
36//!
37//! ## Solution
38//!
39//! Zero-copy serialization where the serialized format IS the in-memory format.
40//! Data can be accessed directly from memory-mapped files without deserialization.
41//!
42//! ```text
43//! Zero-copy read:
44//!   - Validate header: ~10ns
45//!   - Return reference: ~5ns
46//!   - Total: ~15ns per entry (23x faster)
47//! ```
48//!
49//! ## Design
50//!
51//! Fixed-size header followed by variable-length data:
52//! ```text
53//! ┌─────────────────────────────────────────────────────────────┐
54//! │ Magic (4B) │ Version (2B) │ Flags (2B) │ Length (4B) │ CRC │
55//! ├─────────────────────────────────────────────────────────────┤
56//! │                    Fixed-size fields                        │
57//! ├─────────────────────────────────────────────────────────────┤
58//! │ Offset table (variable-length field offsets)                │
59//! ├─────────────────────────────────────────────────────────────┤
60//! │                    Variable-length data                     │
61//! └─────────────────────────────────────────────────────────────┘
62//! ```
63
64use std::mem::size_of;
65
66/// Magic number for zero-copy format
67pub const ZERO_COPY_MAGIC: u32 = 0x5A43_4F50; // "ZCOP"
68
69/// Current format version
70pub const FORMAT_VERSION: u16 = 1;
71
72/// Header size in bytes
73pub const HEADER_SIZE: usize = 16;
74
75// =============================================================================
76// Zero-Copy Header
77// =============================================================================
78
79/// Header for zero-copy serialized data
80#[repr(C, packed)]
81#[derive(Debug, Clone, Copy)]
82pub struct ZeroCopyHeader {
83    /// Magic number for format validation
84    pub magic: u32,
85    /// Format version
86    pub version: u16,
87    /// Flags (compression, etc.)
88    pub flags: u16,
89    /// Total length including header
90    pub total_length: u32,
91    /// CRC32 of data (excluding header)
92    pub crc: u32,
93}
94
95impl ZeroCopyHeader {
96    /// Create new header
97    pub fn new(data_length: usize, flags: u16, crc: u32) -> Self {
98        Self {
99            magic: ZERO_COPY_MAGIC,
100            version: FORMAT_VERSION,
101            flags,
102            total_length: (HEADER_SIZE + data_length) as u32,
103            crc,
104        }
105    }
106
107    /// Validate header
108    #[inline]
109    pub fn validate(&self) -> bool {
110        self.magic == ZERO_COPY_MAGIC && self.version <= FORMAT_VERSION
111    }
112
113    /// Write header to buffer
114    pub fn write_to(&self, buf: &mut [u8]) {
115        assert!(buf.len() >= HEADER_SIZE);
116        buf[0..4].copy_from_slice(&self.magic.to_le_bytes());
117        buf[4..6].copy_from_slice(&self.version.to_le_bytes());
118        buf[6..8].copy_from_slice(&self.flags.to_le_bytes());
119        buf[8..12].copy_from_slice(&self.total_length.to_le_bytes());
120        buf[12..16].copy_from_slice(&self.crc.to_le_bytes());
121    }
122
123    /// Read header from buffer
124    pub fn read_from(buf: &[u8]) -> Option<Self> {
125        if buf.len() < HEADER_SIZE {
126            return None;
127        }
128        Some(Self {
129            magic: u32::from_le_bytes(buf[0..4].try_into().ok()?),
130            version: u16::from_le_bytes(buf[4..6].try_into().ok()?),
131            flags: u16::from_le_bytes(buf[6..8].try_into().ok()?),
132            total_length: u32::from_le_bytes(buf[8..12].try_into().ok()?),
133            crc: u32::from_le_bytes(buf[12..16].try_into().ok()?),
134        })
135    }
136}
137
138// =============================================================================
139// Zero-Copy WAL Entry
140// =============================================================================
141
142/// WAL entry type
143#[repr(u8)]
144#[derive(Debug, Clone, Copy, PartialEq, Eq)]
145pub enum WalEntryType {
146    /// Insert operation
147    Insert = 1,
148    /// Update operation
149    Update = 2,
150    /// Delete operation
151    Delete = 3,
152    /// Begin transaction
153    BeginTxn = 4,
154    /// Commit transaction
155    CommitTxn = 5,
156    /// Abort transaction
157    AbortTxn = 6,
158    /// Checkpoint marker
159    Checkpoint = 7,
160}
161
162impl WalEntryType {
163    pub fn from_u8(v: u8) -> Option<Self> {
164        match v {
165            1 => Some(Self::Insert),
166            2 => Some(Self::Update),
167            3 => Some(Self::Delete),
168            4 => Some(Self::BeginTxn),
169            5 => Some(Self::CommitTxn),
170            6 => Some(Self::AbortTxn),
171            7 => Some(Self::Checkpoint),
172            _ => None,
173        }
174    }
175}
176
177/// Fixed-size WAL entry header (32 bytes)
178#[repr(C)]
179#[derive(Debug, Clone, Copy)]
180pub struct WalEntryHeader {
181    /// Transaction ID
182    pub txn_id: u64,
183    /// Log sequence number
184    pub lsn: u64,
185    /// Timestamp (nanoseconds since epoch)
186    pub timestamp: u64,
187    /// Entry type
188    pub entry_type: u8,
189    /// Number of variable-length fields
190    pub field_count: u8,
191    /// Reserved for alignment
192    pub _reserved: [u8; 6],
193}
194
195pub const WAL_ENTRY_HEADER_SIZE: usize = size_of::<WalEntryHeader>();
196
197impl WalEntryHeader {
198    pub fn new(txn_id: u64, lsn: u64, entry_type: WalEntryType, field_count: u8) -> Self {
199        Self {
200            txn_id,
201            lsn,
202            timestamp: std::time::SystemTime::now()
203                .duration_since(std::time::UNIX_EPOCH)
204                .map(|d| d.as_nanos() as u64)
205                .unwrap_or(0),
206            entry_type: entry_type as u8,
207            field_count,
208            _reserved: [0; 6],
209        }
210    }
211
212    /// Write to buffer
213    pub fn write_to(&self, buf: &mut [u8]) {
214        assert!(buf.len() >= WAL_ENTRY_HEADER_SIZE);
215        buf[0..8].copy_from_slice(&self.txn_id.to_le_bytes());
216        buf[8..16].copy_from_slice(&self.lsn.to_le_bytes());
217        buf[16..24].copy_from_slice(&self.timestamp.to_le_bytes());
218        buf[24] = self.entry_type;
219        buf[25] = self.field_count;
220        buf[26..32].copy_from_slice(&self._reserved);
221    }
222
223    /// Read from buffer (zero-copy)
224    #[inline]
225    pub fn read_from(buf: &[u8]) -> Option<&Self> {
226        if buf.len() < WAL_ENTRY_HEADER_SIZE {
227            return None;
228        }
229        // Safety: We've verified length and WalEntryHeader is repr(C)
230        // This is the zero-copy read - no deserialization!
231        unsafe {
232            let ptr = buf.as_ptr() as *const Self;
233            // Verify alignment
234            if ptr as usize % std::mem::align_of::<Self>() != 0 {
235                return None;
236            }
237            Some(&*ptr)
238        }
239    }
240
241    /// Read from buffer (safe copy version for unaligned data)
242    pub fn read_from_copy(buf: &[u8]) -> Option<Self> {
243        if buf.len() < WAL_ENTRY_HEADER_SIZE {
244            return None;
245        }
246        Some(Self {
247            txn_id: u64::from_le_bytes(buf[0..8].try_into().ok()?),
248            lsn: u64::from_le_bytes(buf[8..16].try_into().ok()?),
249            timestamp: u64::from_le_bytes(buf[16..24].try_into().ok()?),
250            entry_type: buf[24],
251            field_count: buf[25],
252            _reserved: buf[26..32].try_into().ok()?,
253        })
254    }
255}
256
257// =============================================================================
258// Zero-Copy WAL Entry Builder
259// =============================================================================
260
261/// Field descriptor for variable-length fields
262#[repr(C)]
263#[derive(Debug, Clone, Copy)]
264pub struct FieldDescriptor {
265    /// Offset from start of data section
266    pub offset: u32,
267    /// Length of field
268    pub length: u32,
269}
270
271pub const FIELD_DESCRIPTOR_SIZE: usize = size_of::<FieldDescriptor>();
272
273/// Builder for zero-copy WAL entries
274pub struct WalEntryBuilder {
275    /// Fixed header
276    header: WalEntryHeader,
277    /// Field descriptors
278    fields: Vec<FieldDescriptor>,
279    /// Variable-length data
280    data: Vec<u8>,
281}
282
283impl WalEntryBuilder {
284    /// Create new builder
285    pub fn new(txn_id: u64, lsn: u64, entry_type: WalEntryType) -> Self {
286        Self {
287            header: WalEntryHeader::new(txn_id, lsn, entry_type, 0),
288            fields: Vec::new(),
289            data: Vec::new(),
290        }
291    }
292
293    /// Add a variable-length field
294    pub fn add_field(&mut self, data: &[u8]) -> &mut Self {
295        let offset = self.data.len() as u32;
296        let length = data.len() as u32;
297        self.fields.push(FieldDescriptor { offset, length });
298        self.data.extend_from_slice(data);
299        self.header.field_count = self.fields.len() as u8;
300        self
301    }
302
303    /// Add key field
304    pub fn with_key(&mut self, key: &[u8]) -> &mut Self {
305        self.add_field(key)
306    }
307
308    /// Add value field
309    pub fn with_value(&mut self, value: &[u8]) -> &mut Self {
310        self.add_field(value)
311    }
312
313    /// Calculate total size
314    pub fn total_size(&self) -> usize {
315        HEADER_SIZE + 
316        WAL_ENTRY_HEADER_SIZE + 
317        self.fields.len() * FIELD_DESCRIPTOR_SIZE + 
318        self.data.len()
319    }
320
321    /// Build into bytes
322    pub fn build(&self) -> Vec<u8> {
323        let data_len = WAL_ENTRY_HEADER_SIZE + 
324            self.fields.len() * FIELD_DESCRIPTOR_SIZE + 
325            self.data.len();
326        
327        let mut buf = vec![0u8; HEADER_SIZE + data_len];
328        
329        // Calculate CRC of data portion
330        let crc = crc32fast::hash(&buf[HEADER_SIZE..]);
331        
332        // Write header
333        let header = ZeroCopyHeader::new(data_len, 0, crc);
334        header.write_to(&mut buf[0..HEADER_SIZE]);
335        
336        // Write WAL entry header
337        let offset = HEADER_SIZE;
338        self.header.write_to(&mut buf[offset..offset + WAL_ENTRY_HEADER_SIZE]);
339        
340        // Write field descriptors
341        let mut offset = HEADER_SIZE + WAL_ENTRY_HEADER_SIZE;
342        for field in &self.fields {
343            buf[offset..offset + 4].copy_from_slice(&field.offset.to_le_bytes());
344            buf[offset + 4..offset + 8].copy_from_slice(&field.length.to_le_bytes());
345            offset += FIELD_DESCRIPTOR_SIZE;
346        }
347        
348        // Write data
349        buf[offset..].copy_from_slice(&self.data);
350        
351        // Update CRC
352        let crc = crc32fast::hash(&buf[HEADER_SIZE..]);
353        buf[12..16].copy_from_slice(&crc.to_le_bytes());
354        
355        buf
356    }
357}
358
359// =============================================================================
360// Zero-Copy WAL Entry Reader
361// =============================================================================
362
363/// Zero-copy WAL entry reader
364/// 
365/// Provides direct access to WAL entry fields without deserialization.
366/// The backing data must remain valid for the lifetime of this reader.
367pub struct WalEntryReader<'a> {
368    /// Raw backing data
369    data: &'a [u8],
370    /// Parsed header
371    header: &'a WalEntryHeader,
372    /// Field count
373    field_count: usize,
374    /// Offset to field descriptors
375    fields_offset: usize,
376    /// Offset to data section
377    data_offset: usize,
378}
379
380impl<'a> WalEntryReader<'a> {
381    /// Create reader from raw bytes (zero-copy)
382    pub fn from_bytes(bytes: &'a [u8]) -> Option<Self> {
383        // Validate outer header
384        let outer_header = ZeroCopyHeader::read_from(bytes)?;
385        if !outer_header.validate() {
386            return None;
387        }
388        
389        // Validate CRC
390        let expected_crc = outer_header.crc;
391        let actual_crc = crc32fast::hash(&bytes[HEADER_SIZE..]);
392        if expected_crc != actual_crc {
393            return None;
394        }
395        
396        // Read WAL entry header (zero-copy if aligned)
397        let entry_data = &bytes[HEADER_SIZE..];
398        let header = WalEntryHeader::read_from(entry_data)?;
399        
400        let field_count = header.field_count as usize;
401        let fields_offset = WAL_ENTRY_HEADER_SIZE;
402        let data_offset = fields_offset + field_count * FIELD_DESCRIPTOR_SIZE;
403        
404        Some(Self {
405            data: entry_data,
406            header,
407            field_count,
408            fields_offset,
409            data_offset,
410        })
411    }
412
413    /// Get transaction ID
414    #[inline]
415    pub fn txn_id(&self) -> u64 {
416        self.header.txn_id
417    }
418
419    /// Get LSN
420    #[inline]
421    pub fn lsn(&self) -> u64 {
422        self.header.lsn
423    }
424
425    /// Get timestamp
426    #[inline]
427    pub fn timestamp(&self) -> u64 {
428        self.header.timestamp
429    }
430
431    /// Get entry type
432    #[inline]
433    pub fn entry_type(&self) -> Option<WalEntryType> {
434        WalEntryType::from_u8(self.header.entry_type)
435    }
436
437    /// Get number of fields
438    #[inline]
439    pub fn field_count(&self) -> usize {
440        self.field_count
441    }
442
443    /// Get field by index (zero-copy)
444    #[inline]
445    pub fn get_field(&self, index: usize) -> Option<&'a [u8]> {
446        if index >= self.field_count {
447            return None;
448        }
449        
450        let desc_offset = self.fields_offset + index * FIELD_DESCRIPTOR_SIZE;
451        let desc_bytes = self.data.get(desc_offset..desc_offset + FIELD_DESCRIPTOR_SIZE)?;
452        
453        let offset = u32::from_le_bytes(desc_bytes[0..4].try_into().ok()?) as usize;
454        let length = u32::from_le_bytes(desc_bytes[4..8].try_into().ok()?) as usize;
455        
456        let start = self.data_offset + offset;
457        self.data.get(start..start + length)
458    }
459
460    /// Get key field (first field by convention)
461    #[inline]
462    pub fn key(&self) -> Option<&'a [u8]> {
463        self.get_field(0)
464    }
465
466    /// Get value field (second field by convention)
467    #[inline]
468    pub fn value(&self) -> Option<&'a [u8]> {
469        self.get_field(1)
470    }
471
472    /// Iterate over all fields
473    pub fn fields(&self) -> impl Iterator<Item = &'a [u8]> + '_ {
474        (0..self.field_count).filter_map(|i| self.get_field(i))
475    }
476}
477
478// =============================================================================
479// Zero-Copy Batch Writer
480// =============================================================================
481
482/// Batch writer for multiple WAL entries
483/// 
484/// Optimized for group commit scenarios where multiple entries
485/// are written together.
486pub struct WalBatchWriter {
487    /// Accumulated entries
488    entries: Vec<Vec<u8>>,
489    /// Total size
490    total_size: usize,
491}
492
493impl WalBatchWriter {
494    pub fn new() -> Self {
495        Self {
496            entries: Vec::new(),
497            total_size: 0,
498        }
499    }
500
501    pub fn with_capacity(capacity: usize) -> Self {
502        Self {
503            entries: Vec::with_capacity(capacity),
504            total_size: 0,
505        }
506    }
507
508    /// Add entry to batch
509    pub fn add(&mut self, entry: WalEntryBuilder) {
510        let bytes = entry.build();
511        self.total_size += bytes.len();
512        self.entries.push(bytes);
513    }
514
515    /// Get number of entries
516    pub fn len(&self) -> usize {
517        self.entries.len()
518    }
519
520    /// Check if empty
521    pub fn is_empty(&self) -> bool {
522        self.entries.is_empty()
523    }
524
525    /// Get total size in bytes
526    pub fn total_size(&self) -> usize {
527        self.total_size
528    }
529
530    /// Build into single contiguous buffer
531    pub fn build(&self) -> Vec<u8> {
532        let mut buf = Vec::with_capacity(self.total_size + 8);
533        
534        // Write entry count
535        buf.extend_from_slice(&(self.entries.len() as u32).to_le_bytes());
536        // Write total size
537        buf.extend_from_slice(&(self.total_size as u32).to_le_bytes());
538        
539        // Write all entries
540        for entry in &self.entries {
541            buf.extend_from_slice(entry);
542        }
543        
544        buf
545    }
546
547    /// Clear the batch
548    pub fn clear(&mut self) {
549        self.entries.clear();
550        self.total_size = 0;
551    }
552}
553
554impl Default for WalBatchWriter {
555    fn default() -> Self {
556        Self::new()
557    }
558}
559
560// =============================================================================
561// Zero-Copy Batch Reader
562// =============================================================================
563
564/// Batch reader for multiple WAL entries
565pub struct WalBatchReader<'a> {
566    data: &'a [u8],
567    entry_count: usize,
568    #[allow(dead_code)]
569    current_offset: usize,
570}
571
572impl<'a> WalBatchReader<'a> {
573    pub fn from_bytes(data: &'a [u8]) -> Option<Self> {
574        if data.len() < 8 {
575            return None;
576        }
577        
578        let entry_count = u32::from_le_bytes(data[0..4].try_into().ok()?) as usize;
579        let _total_size = u32::from_le_bytes(data[4..8].try_into().ok()?) as usize;
580        
581        Some(Self {
582            data,
583            entry_count,
584            current_offset: 8,
585        })
586    }
587
588    /// Get number of entries in batch
589    pub fn entry_count(&self) -> usize {
590        self.entry_count
591    }
592
593    /// Iterate over entries
594    pub fn entries(&self) -> WalBatchIter<'a> {
595        WalBatchIter {
596            data: self.data,
597            offset: 8,
598            remaining: self.entry_count,
599        }
600    }
601}
602
603/// Iterator over batch entries
604pub struct WalBatchIter<'a> {
605    data: &'a [u8],
606    offset: usize,
607    remaining: usize,
608}
609
610impl<'a> Iterator for WalBatchIter<'a> {
611    type Item = WalEntryReader<'a>;
612
613    fn next(&mut self) -> Option<Self::Item> {
614        if self.remaining == 0 {
615            return None;
616        }
617        
618        let entry_data = &self.data[self.offset..];
619        let header = ZeroCopyHeader::read_from(entry_data)?;
620        
621        let entry_len = header.total_length as usize;
622        let entry = WalEntryReader::from_bytes(&entry_data[..entry_len])?;
623        
624        self.offset += entry_len;
625        self.remaining -= 1;
626        
627        Some(entry)
628    }
629
630    fn size_hint(&self) -> (usize, Option<usize>) {
631        (self.remaining, Some(self.remaining))
632    }
633}
634
635impl<'a> ExactSizeIterator for WalBatchIter<'a> {}
636
637// =============================================================================
638// Memory-Mapped Zero-Copy Access
639// =============================================================================
640
641/// Memory-mapped WAL file for zero-copy access
642pub struct MmapWalReader {
643    /// Memory-mapped data
644    mmap: memmap2::Mmap,
645    /// File size
646    size: usize,
647}
648
649impl MmapWalReader {
650    /// Open WAL file for zero-copy reading
651    pub fn open(path: &std::path::Path) -> std::io::Result<Self> {
652        let file = std::fs::File::open(path)?;
653        let metadata = file.metadata()?;
654        let size = metadata.len() as usize;
655        
656        // Safety: File is opened read-only
657        let mmap = unsafe { memmap2::Mmap::map(&file)? };
658        
659        Ok(Self { mmap, size })
660    }
661
662    /// Get raw bytes
663    pub fn as_bytes(&self) -> &[u8] {
664        &self.mmap
665    }
666
667    /// Get file size
668    pub fn size(&self) -> usize {
669        self.size
670    }
671
672    /// Read entry at offset (zero-copy)
673    pub fn read_entry_at(&self, offset: usize) -> Option<WalEntryReader<'_>> {
674        if offset >= self.size {
675            return None;
676        }
677        WalEntryReader::from_bytes(&self.mmap[offset..])
678    }
679
680    /// Iterate over all entries
681    pub fn entries(&self) -> MmapWalIter<'_> {
682        MmapWalIter {
683            data: &self.mmap,
684            offset: 0,
685            size: self.size,
686        }
687    }
688}
689
690/// Iterator over memory-mapped WAL entries
691pub struct MmapWalIter<'a> {
692    data: &'a [u8],
693    offset: usize,
694    size: usize,
695}
696
697impl<'a> Iterator for MmapWalIter<'a> {
698    type Item = WalEntryReader<'a>;
699
700    fn next(&mut self) -> Option<Self::Item> {
701        if self.offset >= self.size {
702            return None;
703        }
704        
705        let entry_data = &self.data[self.offset..];
706        if entry_data.len() < HEADER_SIZE {
707            return None;
708        }
709        
710        let header = ZeroCopyHeader::read_from(entry_data)?;
711        if !header.validate() {
712            return None;
713        }
714        
715        let entry_len = header.total_length as usize;
716        if self.offset + entry_len > self.size {
717            return None;
718        }
719        
720        let entry = WalEntryReader::from_bytes(&entry_data[..entry_len])?;
721        self.offset += entry_len;
722        
723        Some(entry)
724    }
725}
726
727// =============================================================================
728// Statistics
729// =============================================================================
730
731/// Serialization statistics
732#[derive(Debug, Default)]
733pub struct SerdeStats {
734    /// Total entries written
735    pub entries_written: u64,
736    /// Total bytes written
737    pub bytes_written: u64,
738    /// Total entries read
739    pub entries_read: u64,
740    /// Total bytes read (zero-copy, not actually copied)
741    pub bytes_read: u64,
742    /// CRC validation failures
743    pub crc_failures: u64,
744}
745
746impl SerdeStats {
747    pub fn new() -> Self {
748        Self::default()
749    }
750
751    pub fn record_write(&mut self, bytes: usize) {
752        self.entries_written += 1;
753        self.bytes_written += bytes as u64;
754    }
755
756    pub fn record_read(&mut self, bytes: usize) {
757        self.entries_read += 1;
758        self.bytes_read += bytes as u64;
759    }
760
761    pub fn record_crc_failure(&mut self) {
762        self.crc_failures += 1;
763    }
764}
765
766// =============================================================================
767// Tests
768// =============================================================================
769
770#[cfg(test)]
771mod tests {
772    use super::*;
773
774    #[test]
775    fn test_wal_entry_roundtrip() {
776        let mut builder = WalEntryBuilder::new(42, 100, WalEntryType::Insert);
777        builder.with_key(b"test_key").with_value(b"test_value");
778        
779        let bytes = builder.build();
780        let reader = WalEntryReader::from_bytes(&bytes).unwrap();
781        
782        assert_eq!(reader.txn_id(), 42);
783        assert_eq!(reader.lsn(), 100);
784        assert_eq!(reader.entry_type(), Some(WalEntryType::Insert));
785        assert_eq!(reader.field_count(), 2);
786        assert_eq!(reader.key(), Some(b"test_key".as_slice()));
787        assert_eq!(reader.value(), Some(b"test_value".as_slice()));
788    }
789
790    #[test]
791    fn test_wal_entry_zero_copy_header() {
792        let header = WalEntryHeader::new(123, 456, WalEntryType::Update, 3);
793        let mut buf = vec![0u8; WAL_ENTRY_HEADER_SIZE];
794        header.write_to(&mut buf);
795        
796        // Test zero-copy read (if aligned)
797        if let Some(read_header) = WalEntryHeader::read_from(&buf) {
798            assert_eq!(read_header.txn_id, 123);
799            assert_eq!(read_header.lsn, 456);
800            assert_eq!(read_header.entry_type, WalEntryType::Update as u8);
801            assert_eq!(read_header.field_count, 3);
802        }
803        
804        // Test copy read
805        let read_header = WalEntryHeader::read_from_copy(&buf).unwrap();
806        assert_eq!(read_header.txn_id, 123);
807        assert_eq!(read_header.lsn, 456);
808    }
809
810    #[test]
811    fn test_wal_entry_crc_validation() {
812        let mut builder = WalEntryBuilder::new(1, 1, WalEntryType::Insert);
813        builder.with_key(b"key");
814        
815        let mut bytes = builder.build();
816        
817        // Valid entry should parse
818        assert!(WalEntryReader::from_bytes(&bytes).is_some());
819        
820        // Corrupt data
821        if bytes.len() > 20 {
822            bytes[20] ^= 0xFF;
823        }
824        
825        // Corrupted entry should fail CRC
826        assert!(WalEntryReader::from_bytes(&bytes).is_none());
827    }
828
829    #[test]
830    fn test_batch_writer_reader() {
831        let mut batch = WalBatchWriter::new();
832        
833        for i in 0..10 {
834            let mut entry = WalEntryBuilder::new(i, i * 10, WalEntryType::Insert);
835            entry.with_key(format!("key_{}", i).as_bytes());
836            entry.with_value(format!("value_{}", i).as_bytes());
837            batch.add(entry);
838        }
839        
840        assert_eq!(batch.len(), 10);
841        
842        let bytes = batch.build();
843        let reader = WalBatchReader::from_bytes(&bytes).unwrap();
844        
845        assert_eq!(reader.entry_count(), 10);
846        
847        for (i, entry) in reader.entries().enumerate() {
848            assert_eq!(entry.txn_id(), i as u64);
849            assert_eq!(entry.key(), Some(format!("key_{}", i).as_bytes()));
850        }
851    }
852
853    #[test]
854    fn test_multiple_fields() {
855        let mut builder = WalEntryBuilder::new(1, 1, WalEntryType::Update);
856        builder.add_field(b"field_0");
857        builder.add_field(b"field_1");
858        builder.add_field(b"field_2");
859        builder.add_field(b"field_3");
860        
861        let bytes = builder.build();
862        let reader = WalEntryReader::from_bytes(&bytes).unwrap();
863        
864        assert_eq!(reader.field_count(), 4);
865        
866        let fields: Vec<_> = reader.fields().collect();
867        assert_eq!(fields.len(), 4);
868        assert_eq!(fields[0], b"field_0");
869        assert_eq!(fields[1], b"field_1");
870        assert_eq!(fields[2], b"field_2");
871        assert_eq!(fields[3], b"field_3");
872    }
873
874    #[test]
875    fn test_empty_fields() {
876        let builder = WalEntryBuilder::new(1, 1, WalEntryType::BeginTxn);
877        
878        let bytes = builder.build();
879        let reader = WalEntryReader::from_bytes(&bytes).unwrap();
880        
881        assert_eq!(reader.field_count(), 0);
882        assert_eq!(reader.entry_type(), Some(WalEntryType::BeginTxn));
883    }
884
885    #[test]
886    fn test_large_value() {
887        let large_value = vec![0xAB; 1024 * 1024]; // 1MB
888        
889        let mut builder = WalEntryBuilder::new(1, 1, WalEntryType::Insert);
890        builder.with_key(b"large_key").with_value(&large_value);
891        
892        let bytes = builder.build();
893        let reader = WalEntryReader::from_bytes(&bytes).unwrap();
894        
895        assert_eq!(reader.value(), Some(large_value.as_slice()));
896    }
897
898    #[test]
899    fn test_header_validation() {
900        let header = ZeroCopyHeader::new(100, 0, 12345);
901        assert!(header.validate());
902        
903        let mut bad_header = header;
904        bad_header.magic = 0xDEADBEEF;
905        assert!(!bad_header.validate());
906    }
907}