Skip to main content

sqlitegraph/backend/native/v2/wal/
record.rs

1//! V2 WAL record types and serialization.
2//!
3//! This module defines the record format for V2 WAL operations, providing
4//! efficient serialization/deserialization with minimal overhead and
5//! cluster-affinity organization for optimal I/O patterns.
6
7use crate::backend::native::v2::edge_cluster::{CompactEdgeRecord, Direction};
8use crate::backend::native::{NativeBackendError, NativeResult};
9/// V2 WAL record types for different operations
10#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
11#[repr(u8)]
12pub enum V2WALRecordType {
13    /// Node creation with initial data
14    NodeInsert = 1,
15
16    /// Node modification/update
17    NodeUpdate = 2,
18
19    /// Node deletion (logical)
20    NodeDelete = 3,
21
22    /// Edge cluster creation
23    ClusterCreate = 4,
24
25    /// Edge insertion into cluster
26    EdgeInsert = 5,
27
28    /// Edge modification within cluster
29    EdgeUpdate = 6,
30
31    /// Edge deletion (logical)
32    EdgeDelete = 7,
33
34    /// String table entry creation
35    StringInsert = 8,
36
37    /// Free space block allocation
38    FreeSpaceAllocate = 9,
39
40    /// Free space block deallocation
41    FreeSpaceDeallocate = 10,
42
43    /// Transaction begin marker
44    TransactionBegin = 11,
45
46    /// Transaction commit marker
47    TransactionCommit = 12,
48
49    /// Transaction rollback marker
50    TransactionRollback = 13,
51
52    /// Checkpoint marker
53    Checkpoint = 14,
54
55    /// Database header update
56    HeaderUpdate = 15,
57
58    /// End of WAL segment marker
59    SegmentEnd = 16,
60
61    /// Transaction prepare phase marker (two-phase commit)
62    TransactionPrepare = 17,
63
64    /// Transaction abort marker (two-phase commit)
65    TransactionAbort = 18,
66
67    /// Savepoint creation marker
68    SavepointCreate = 19,
69
70    /// Savepoint rollback marker
71    SavepointRollback = 20,
72
73    /// Savepoint release marker
74    SavepointRelease = 21,
75
76    /// Backup creation marker
77    BackupCreate = 22,
78
79    /// Backup restore marker
80    BackupRestore = 23,
81
82    /// Lock acquisition marker
83    LockAcquire = 24,
84
85    /// Lock release marker
86    LockRelease = 25,
87
88    /// Index update marker
89    IndexUpdate = 26,
90
91    /// Statistics update marker
92    StatisticsUpdate = 27,
93
94    /// Contiguous region allocation
95    AllocateContiguous = 28,
96
97    /// Contiguous region commit
98    CommitContiguous = 29,
99
100    /// Contiguous region rollback
101    RollbackContiguous = 30,
102
103    /// KV key-value set operation
104    KvSet = 31,
105
106    /// KV key delete operation
107    KvDelete = 32,
108}
109
110impl V2WALRecordType {
111    /// Get all record types that modify data (require checkpointing)
112    pub fn data_modifying() -> &'static [V2WALRecordType] {
113        &[
114            Self::NodeInsert,
115            Self::NodeUpdate,
116            Self::NodeDelete,
117            Self::ClusterCreate,
118            Self::EdgeInsert,
119            Self::EdgeUpdate,
120            Self::EdgeDelete,
121            Self::StringInsert,
122            Self::FreeSpaceAllocate,
123            Self::FreeSpaceDeallocate,
124            Self::HeaderUpdate,
125            Self::AllocateContiguous,
126            Self::CommitContiguous,
127            Self::RollbackContiguous,
128            Self::KvSet,
129            Self::KvDelete,
130        ]
131    }
132
133    /// Get transaction control record types
134    pub fn transaction_control() -> &'static [V2WALRecordType] {
135        &[
136            Self::TransactionBegin,
137            Self::TransactionCommit,
138            Self::TransactionRollback,
139        ]
140    }
141
142    /// Check if a record type requires checkpointing
143    pub fn requires_checkpoint(&self) -> bool {
144        Self::data_modifying().contains(self)
145    }
146
147    /// Check if a record type is part of transaction control
148    pub fn is_transaction_control(&self) -> bool {
149        Self::transaction_control().contains(self)
150    }
151}
152
153impl TryFrom<u8> for V2WALRecordType {
154    type Error = NativeBackendError;
155
156    fn try_from(value: u8) -> NativeResult<Self> {
157        match value {
158            1 => Ok(Self::NodeInsert),
159            2 => Ok(Self::NodeUpdate),
160            3 => Ok(Self::NodeDelete),
161            4 => Ok(Self::ClusterCreate),
162            5 => Ok(Self::EdgeInsert),
163            6 => Ok(Self::EdgeUpdate),
164            7 => Ok(Self::EdgeDelete),
165            8 => Ok(Self::StringInsert),
166            9 => Ok(Self::FreeSpaceAllocate),
167            10 => Ok(Self::FreeSpaceDeallocate),
168            11 => Ok(Self::TransactionBegin),
169            12 => Ok(Self::TransactionCommit),
170            13 => Ok(Self::TransactionRollback),
171            14 => Ok(Self::Checkpoint),
172            15 => Ok(Self::HeaderUpdate),
173            16 => Ok(Self::SegmentEnd),
174            17 => Ok(Self::TransactionPrepare),
175            18 => Ok(Self::TransactionAbort),
176            19 => Ok(Self::SavepointCreate),
177            20 => Ok(Self::SavepointRollback),
178            21 => Ok(Self::SavepointRelease),
179            22 => Ok(Self::BackupCreate),
180            23 => Ok(Self::BackupRestore),
181            24 => Ok(Self::LockAcquire),
182            25 => Ok(Self::LockRelease),
183            26 => Ok(Self::IndexUpdate),
184            27 => Ok(Self::StatisticsUpdate),
185            28 => Ok(Self::AllocateContiguous),
186            29 => Ok(Self::CommitContiguous),
187            30 => Ok(Self::RollbackContiguous),
188            31 => Ok(Self::KvSet),
189            32 => Ok(Self::KvDelete),
190            _ => Err(NativeBackendError::CorruptStringTable {
191                reason: format!("unknown WAL record type: {}", value),
192            }),
193        }
194    }
195}
196
197/// V2 WAL record containing operation data
198#[derive(Debug, Clone)]
199pub enum V2WALRecord {
200    /// Node creation with initial data
201    NodeInsert {
202        node_id: i64,
203        slot_offset: u64,
204        node_data: Vec<u8>,
205    },
206
207    /// Node modification/update
208    NodeUpdate {
209        node_id: i64,
210        slot_offset: u64,
211        old_data: Vec<u8>,
212        new_data: Vec<u8>,
213    },
214
215    /// Node deletion (logical) with complete before-image capture
216    NodeDelete {
217        node_id: i64,
218        slot_offset: u64,
219        old_data: Vec<u8>,
220        /// Outgoing edges captured before deletion (for rollback)
221        outgoing_edges: Vec<CompactEdgeRecord>,
222        /// Incoming edges captured before deletion (for rollback)
223        incoming_edges: Vec<CompactEdgeRecord>,
224    },
225
226    /// Edge cluster creation
227    ClusterCreate {
228        node_id: i64,
229        direction: Direction,
230        cluster_offset: u64,
231        cluster_size: u32,
232        edge_data: Vec<u8>,
233    },
234
235    /// Edge insertion into cluster
236    EdgeInsert {
237        cluster_key: (i64, Direction), // (node_id, direction)
238        edge_record: CompactEdgeRecord,
239        insertion_point: u32,
240    },
241
242    /// Edge modification within cluster
243    EdgeUpdate {
244        cluster_key: (i64, Direction),
245        old_edge: CompactEdgeRecord,
246        new_edge: CompactEdgeRecord,
247        position: u32,
248    },
249
250    /// Edge deletion (logical)
251    EdgeDelete {
252        cluster_key: (i64, Direction),
253        old_edge: CompactEdgeRecord,
254        position: u32,
255    },
256
257    /// String table entry creation
258    StringInsert {
259        string_id: u32,
260        string_value: String,
261    },
262
263    /// Free space block allocation
264    FreeSpaceAllocate {
265        block_offset: u64,
266        block_size: u32,
267        block_type: u8,
268    },
269
270    /// Free space block deallocation
271    FreeSpaceDeallocate {
272        block_offset: u64,
273        block_size: u32,
274        block_type: u8,
275    },
276
277    /// Transaction begin marker
278    TransactionBegin { tx_id: u64, timestamp: u64 },
279
280    /// Transaction commit marker
281    TransactionCommit { tx_id: u64, timestamp: u64 },
282
283    /// Transaction rollback marker
284    TransactionRollback { tx_id: u64, timestamp: u64 },
285
286    /// Checkpoint marker
287    Checkpoint {
288        checkpointed_lsn: u64,
289        timestamp: u64,
290    },
291
292    /// Database header update
293    HeaderUpdate {
294        header_offset: u64,
295        old_data: Vec<u8>,
296        new_data: Vec<u8>,
297    },
298
299    /// End of WAL segment marker
300    SegmentEnd { segment_lsn: u64, checksum: u32 },
301
302    /// Transaction prepare phase marker (two-phase commit)
303    TransactionPrepare {
304        tx_id: u64,
305        record_count: u64,
306        timestamp: std::time::SystemTime,
307    },
308
309    /// Transaction abort marker (two-phase commit)
310    TransactionAbort {
311        tx_id: u64,
312        abort_reason: String,
313        timestamp: std::time::SystemTime,
314    },
315
316    /// Savepoint creation marker
317    SavepointCreate {
318        tx_id: u64,
319        savepoint_id: String,
320        timestamp: std::time::SystemTime,
321    },
322
323    /// Savepoint rollback marker
324    SavepointRollback {
325        tx_id: u64,
326        savepoint_id: String,
327        timestamp: std::time::SystemTime,
328    },
329
330    /// Savepoint release marker
331    SavepointRelease {
332        tx_id: u64,
333        savepoint_id: String,
334        timestamp: std::time::SystemTime,
335    },
336
337    /// Backup creation marker
338    BackupCreate {
339        backup_id: String,
340        backup_path: std::path::PathBuf,
341        timestamp: std::time::SystemTime,
342    },
343
344    /// Backup restore marker
345    BackupRestore {
346        backup_id: String,
347        backup_path: std::path::PathBuf,
348        target_path: std::path::PathBuf,
349        timestamp: std::time::SystemTime,
350    },
351
352    /// Lock acquisition marker
353    LockAcquire {
354        tx_id: u64,
355        resource_id: i64,
356        lock_type: u8,
357        timestamp: std::time::SystemTime,
358    },
359
360    /// Lock release marker
361    LockRelease {
362        tx_id: u64,
363        resource_id: i64,
364        timestamp: std::time::SystemTime,
365    },
366
367    /// Index update marker
368    IndexUpdate {
369        index_id: u32,
370        operation_type: u8,
371        key_data: Vec<u8>,
372        timestamp: std::time::SystemTime,
373    },
374
375    /// Statistics update marker
376    StatisticsUpdate {
377        stats_type: u8,
378        stats_data: Vec<u8>,
379        timestamp: std::time::SystemTime,
380    },
381
382    /// Contiguous region allocation
383    AllocateContiguous {
384        txn_id: u64,
385        region: ContiguousRegion,
386        timestamp: u64,
387    },
388
389    /// Contiguous region commit
390    CommitContiguous {
391        txn_id: u64,
392        region: ContiguousRegion,
393    },
394
395    /// Contiguous region rollback
396    RollbackContiguous { region: ContiguousRegion },
397
398    /// KV key-value set operation
399    KvSet {
400        key: Vec<u8>,
401        value_bytes: Vec<u8>,
402        value_type: u8,
403        ttl_seconds: Option<u64>,
404        version: u64,
405    },
406
407    /// KV key delete operation
408    KvDelete {
409        key: Vec<u8>,
410        old_value_bytes: Option<Vec<u8>>,
411        old_value_type: u8,
412        old_version: u64,
413    },
414}
415
416/// Contiguous region descriptor for WAL records
417#[derive(Debug, Clone, PartialEq, Eq)]
418pub struct ContiguousRegion {
419    /// Starting offset in bytes
420    pub start_offset: u64,
421    /// Total size in bytes
422    pub total_size: u64,
423    /// Number of clusters this region can hold
424    pub cluster_count: u32,
425    /// Fixed cluster size (stride between clusters)
426    pub stride: u32,
427}
428
429impl ContiguousRegion {
430    /// Create a new region descriptor
431    pub fn new(start_offset: u64, total_size: u64) -> Self {
432        Self {
433            start_offset,
434            total_size,
435            cluster_count: 0,
436            stride: 0,
437        }
438    }
439
440    /// Set cluster metadata
441    pub fn with_clusters(mut self, cluster_count: u32, stride: u32) -> Self {
442        self.cluster_count = cluster_count;
443        self.stride = stride;
444        self
445    }
446
447    /// Get the end offset of this region
448    pub fn end_offset(&self) -> u64 {
449        self.start_offset + self.total_size
450    }
451
452    /// Check if this region overlaps with another
453    pub fn overlaps(&self, other: &ContiguousRegion) -> bool {
454        self.start_offset < other.end_offset() && other.start_offset < self.end_offset()
455    }
456
457    /// Serialize to bytes
458    pub fn serialize(&self) -> Vec<u8> {
459        let mut buffer = Vec::with_capacity(24); // 8 + 8 + 4 + 4
460        buffer.extend_from_slice(&self.start_offset.to_le_bytes());
461        buffer.extend_from_slice(&self.total_size.to_le_bytes());
462        buffer.extend_from_slice(&self.cluster_count.to_le_bytes());
463        buffer.extend_from_slice(&self.stride.to_le_bytes());
464        buffer
465    }
466
467    /// Deserialize from bytes
468    pub fn deserialize(data: &[u8]) -> Result<Self, String> {
469        if data.len() < 24 {
470            return Err(format!(
471                "Insufficient data for ContiguousRegion: expected 24, got {}",
472                data.len()
473            ));
474        }
475
476        let start_offset = u64::from_le_bytes(data[0..8].try_into().unwrap());
477        let total_size = u64::from_le_bytes(data[8..16].try_into().unwrap());
478        let cluster_count = u32::from_le_bytes(data[16..20].try_into().unwrap());
479        let stride = u32::from_le_bytes(data[20..24].try_into().unwrap());
480
481        Ok(Self {
482            start_offset,
483            total_size,
484            cluster_count,
485            stride,
486        })
487    }
488}
489
490impl V2WALRecord {
491    /// Get the record type
492    pub fn record_type(&self) -> V2WALRecordType {
493        match self {
494            Self::NodeInsert { .. } => V2WALRecordType::NodeInsert,
495            Self::NodeUpdate { .. } => V2WALRecordType::NodeUpdate,
496            Self::NodeDelete { .. } => V2WALRecordType::NodeDelete,
497            Self::ClusterCreate { .. } => V2WALRecordType::ClusterCreate,
498            Self::EdgeInsert { .. } => V2WALRecordType::EdgeInsert,
499            Self::EdgeUpdate { .. } => V2WALRecordType::EdgeUpdate,
500            Self::EdgeDelete { .. } => V2WALRecordType::EdgeDelete,
501            Self::StringInsert { .. } => V2WALRecordType::StringInsert,
502            Self::FreeSpaceAllocate { .. } => V2WALRecordType::FreeSpaceAllocate,
503            Self::FreeSpaceDeallocate { .. } => V2WALRecordType::FreeSpaceDeallocate,
504            Self::TransactionBegin { .. } => V2WALRecordType::TransactionBegin,
505            Self::TransactionCommit { .. } => V2WALRecordType::TransactionCommit,
506            Self::TransactionRollback { .. } => V2WALRecordType::TransactionRollback,
507            Self::Checkpoint { .. } => V2WALRecordType::Checkpoint,
508            Self::HeaderUpdate { .. } => V2WALRecordType::HeaderUpdate,
509            Self::SegmentEnd { .. } => V2WALRecordType::SegmentEnd,
510            Self::TransactionPrepare { .. } => V2WALRecordType::TransactionPrepare,
511            Self::TransactionAbort { .. } => V2WALRecordType::TransactionAbort,
512            Self::SavepointCreate { .. } => V2WALRecordType::SavepointCreate,
513            Self::SavepointRollback { .. } => V2WALRecordType::SavepointRollback,
514            Self::SavepointRelease { .. } => V2WALRecordType::SavepointRelease,
515            Self::BackupCreate { .. } => V2WALRecordType::BackupCreate,
516            Self::BackupRestore { .. } => V2WALRecordType::BackupRestore,
517            Self::LockAcquire { .. } => V2WALRecordType::LockAcquire,
518            Self::LockRelease { .. } => V2WALRecordType::LockRelease,
519            Self::IndexUpdate { .. } => V2WALRecordType::IndexUpdate,
520            Self::StatisticsUpdate { .. } => V2WALRecordType::StatisticsUpdate,
521            Self::AllocateContiguous { .. } => V2WALRecordType::AllocateContiguous,
522            Self::CommitContiguous { .. } => V2WALRecordType::CommitContiguous,
523            Self::RollbackContiguous { .. } => V2WALRecordType::RollbackContiguous,
524            Self::KvSet { .. } => V2WALRecordType::KvSet,
525            Self::KvDelete { .. } => V2WALRecordType::KvDelete,
526        }
527    }
528
529    /// Get the cluster key for cluster-affinity logging (if applicable)
530    pub fn cluster_key(&self) -> Option<i64> {
531        match self {
532            Self::NodeInsert { node_id, .. } => Some(*node_id),
533            Self::NodeUpdate { node_id, .. } => Some(*node_id),
534            Self::NodeDelete { node_id, .. } => Some(*node_id),
535            Self::ClusterCreate { node_id, .. } => Some(*node_id),
536            Self::EdgeInsert {
537                cluster_key: (node_id, _),
538                ..
539            } => Some(*node_id),
540            Self::EdgeUpdate {
541                cluster_key: (node_id, _),
542                ..
543            } => Some(*node_id),
544            Self::EdgeDelete {
545                cluster_key: (node_id, _),
546                ..
547            } => Some(*node_id),
548            _ => None,
549        }
550    }
551
552    /// Estimate the serialized size of this record
553    pub fn serialized_size(&self) -> usize {
554        let base_size = std::mem::size_of::<V2WALRecordType>() + std::mem::size_of::<u32>(); // type + size field
555
556        match self {
557            Self::NodeInsert { node_data, .. } => base_size + 8 + 8 + 4 + node_data.len(),
558            Self::NodeUpdate {
559                old_data, new_data, ..
560            } => base_size + 8 + 8 + 4 + old_data.len() + 4 + new_data.len(),
561            Self::NodeDelete {
562                old_data,
563                outgoing_edges,
564                incoming_edges,
565                ..
566            } => {
567                let outgoing_size: usize = outgoing_edges.iter().map(|e| e.serialized_size()).sum();
568                let incoming_size: usize = incoming_edges.iter().map(|e| e.serialized_size()).sum();
569                base_size + 8 + 8 + 4 + old_data.len() + 4 + outgoing_size + 4 + incoming_size
570            }
571            Self::ClusterCreate { edge_data, .. } => base_size + 8 + 1 + 8 + 4 + edge_data.len(),
572            Self::EdgeInsert { edge_record, .. } => {
573                base_size + 8 + 1 + edge_record.serialized_size() + 4
574            }
575            Self::EdgeUpdate {
576                old_edge, new_edge, ..
577            } => base_size + 8 + 1 + old_edge.serialized_size() + new_edge.serialized_size() + 4,
578            Self::EdgeDelete { old_edge, .. } => base_size + 8 + 1 + old_edge.serialized_size() + 4,
579            Self::StringInsert { string_value, .. } => base_size + 4 + string_value.len(),
580            Self::FreeSpaceAllocate { .. } | Self::FreeSpaceDeallocate { .. } => {
581                base_size + 8 + 4 + 1
582            }
583            Self::TransactionBegin { .. }
584            | Self::TransactionCommit { .. }
585            | Self::TransactionRollback { .. } => base_size + 8 + 8,
586            Self::Checkpoint { .. } => base_size + 8 + 8,
587            Self::HeaderUpdate {
588                old_data, new_data, ..
589            } => base_size + 8 + old_data.len() + new_data.len(),
590            Self::SegmentEnd { .. } => base_size + 8 + 4,
591            Self::TransactionPrepare {
592                record_count: _, ..
593            } => base_size + 8 + 8 + 8,
594            Self::TransactionAbort { abort_reason, .. } => base_size + 8 + abort_reason.len(),
595            Self::SavepointCreate { savepoint_id, .. } => base_size + 8 + savepoint_id.len(),
596            Self::SavepointRollback { savepoint_id, .. } => base_size + 8 + savepoint_id.len(),
597            Self::SavepointRelease { savepoint_id, .. } => base_size + 8 + savepoint_id.len(),
598            Self::BackupCreate {
599                backup_id,
600                backup_path,
601                ..
602            } => base_size + backup_id.len() + backup_path.to_string_lossy().len(),
603            Self::BackupRestore {
604                backup_id,
605                backup_path,
606                target_path,
607                ..
608            } => {
609                base_size
610                    + backup_id.len()
611                    + backup_path.to_string_lossy().len()
612                    + target_path.to_string_lossy().len()
613            }
614            Self::LockAcquire { .. } | Self::LockRelease { .. } => base_size + 8 + 8 + 1,
615            Self::IndexUpdate { .. } | Self::StatisticsUpdate { .. } => base_size,
616            Self::AllocateContiguous { .. } => base_size + 8 + 24 + 8, // txn_id + region + timestamp
617            Self::CommitContiguous { .. } => base_size + 8 + 24,       // txn_id + region
618            Self::RollbackContiguous { .. } => base_size + 24,         // region
619            Self::KvSet {
620                key,
621                value_bytes,
622                ttl_seconds,
623                ..
624            } => {
625                base_size
626                    + 4
627                    + key.len()
628                    + 4
629                    + value_bytes.len()
630                    + 1
631                    + 1
632                    + (if ttl_seconds.is_some() { 8 } else { 0 })
633                    + 8
634            }
635            Self::KvDelete {
636                key,
637                old_value_bytes,
638                ..
639            } => {
640                base_size
641                    + 4
642                    + key.len()
643                    + 1
644                    + old_value_bytes.as_ref().map(|v| 4 + v.len()).unwrap_or(0)
645                    + 8
646            }
647        }
648    }
649
650    /// Check if this record modifies data (requires checkpointing)
651    pub fn modifies_data(&self) -> bool {
652        self.record_type().requires_checkpoint()
653    }
654
655    /// Check if this record is transaction control
656    pub fn is_transaction_control(&self) -> bool {
657        self.record_type().is_transaction_control()
658    }
659}
660
661/// WAL record serialization error
662#[derive(Debug, Clone)]
663pub enum WALSerializationError {
664    /// Invalid record type encountered
665    InvalidRecordType(u8),
666
667    /// Insufficient data for record deserialization
668    InsufficientData {
669        expected: usize,
670        actual: usize,
671        record_type: V2WALRecordType,
672    },
673
674    /// Data corruption detected
675    CorruptedData { location: String, details: String },
676
677    /// I/O error during serialization
678    IoError(String),
679
680    /// Size overflow in record data
681    SizeOverflow,
682}
683
684impl std::fmt::Display for WALSerializationError {
685    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
686        match self {
687            Self::InvalidRecordType(t) => write!(f, "Invalid WAL record type: {}", t),
688            Self::InsufficientData {
689                expected,
690                actual,
691                record_type,
692            } => {
693                write!(
694                    f,
695                    "Insufficient data for {:?}: expected {}, got {}",
696                    record_type, expected, actual
697                )
698            }
699            Self::CorruptedData { location, details } => {
700                write!(f, "Corrupted WAL data at {}: {}", location, details)
701            }
702            Self::IoError(msg) => write!(f, "I/O error during WAL serialization: {}", msg),
703            Self::SizeOverflow => write!(f, "Size overflow in WAL record data"),
704        }
705    }
706}
707
708impl std::error::Error for WALSerializationError {}
709
710/// WAL record serializer/deserializer
711pub struct V2WALSerializer;
712
713impl V2WALSerializer {
714    /// Serialize a WAL record to bytes
715    pub fn serialize(record: &V2WALRecord) -> NativeResult<Vec<u8>> {
716        let mut buffer = Vec::with_capacity(record.serialized_size());
717
718        // Write record type
719        buffer.push(record.record_type() as u8);
720
721        // Write placeholder for record size
722        let size_pos = buffer.len();
723        buffer.extend_from_slice(&[0u8; 4]);
724
725        let data_start = buffer.len();
726
727        // Serialize record-specific data
728        match record {
729            V2WALRecord::NodeInsert {
730                node_id,
731                slot_offset,
732                node_data,
733            } => {
734                buffer.extend_from_slice(&node_id.to_le_bytes());
735                buffer.extend_from_slice(&slot_offset.to_le_bytes());
736                buffer.extend_from_slice(&(node_data.len() as u32).to_le_bytes());
737                buffer.extend_from_slice(node_data);
738            }
739
740            V2WALRecord::NodeUpdate {
741                node_id,
742                slot_offset,
743                old_data,
744                new_data,
745            } => {
746                buffer.extend_from_slice(&node_id.to_le_bytes());
747                buffer.extend_from_slice(&slot_offset.to_le_bytes());
748                buffer.extend_from_slice(&(old_data.len() as u32).to_le_bytes());
749                buffer.extend_from_slice(old_data);
750                buffer.extend_from_slice(&(new_data.len() as u32).to_le_bytes());
751                buffer.extend_from_slice(new_data);
752            }
753
754            V2WALRecord::NodeDelete {
755                node_id,
756                slot_offset,
757                old_data,
758                outgoing_edges,
759                incoming_edges,
760            } => {
761                buffer.extend_from_slice(&node_id.to_le_bytes());
762                buffer.extend_from_slice(&slot_offset.to_le_bytes());
763                buffer.extend_from_slice(&(old_data.len() as u32).to_le_bytes());
764                buffer.extend_from_slice(old_data);
765
766                // Serialize outgoing edges
767                buffer.extend_from_slice(&(outgoing_edges.len() as u32).to_le_bytes());
768                for edge in outgoing_edges {
769                    buffer.extend_from_slice(&edge.serialize());
770                }
771
772                // Serialize incoming edges
773                buffer.extend_from_slice(&(incoming_edges.len() as u32).to_le_bytes());
774                for edge in incoming_edges {
775                    buffer.extend_from_slice(&edge.serialize());
776                }
777            }
778
779            V2WALRecord::ClusterCreate {
780                node_id,
781                direction,
782                cluster_offset,
783                cluster_size,
784                edge_data,
785            } => {
786                buffer.extend_from_slice(&node_id.to_le_bytes());
787                buffer.push(*direction as u8);
788                buffer.extend_from_slice(&cluster_offset.to_le_bytes());
789                buffer.extend_from_slice(&cluster_size.to_le_bytes());
790                buffer.extend_from_slice(&(edge_data.len() as u32).to_le_bytes());
791                buffer.extend_from_slice(edge_data);
792            }
793
794            V2WALRecord::EdgeInsert {
795                cluster_key,
796                edge_record,
797                insertion_point,
798            } => {
799                buffer.extend_from_slice(&cluster_key.0.to_le_bytes());
800                buffer.push(cluster_key.1 as u8);
801                buffer.extend_from_slice(&edge_record.as_bytes());
802                buffer.extend_from_slice(&insertion_point.to_le_bytes());
803            }
804
805            V2WALRecord::TransactionBegin { tx_id, timestamp } => {
806                buffer.extend_from_slice(&tx_id.to_le_bytes());
807                buffer.extend_from_slice(&timestamp.to_le_bytes());
808            }
809
810            V2WALRecord::TransactionCommit { tx_id, timestamp } => {
811                buffer.extend_from_slice(&tx_id.to_le_bytes());
812                buffer.extend_from_slice(&timestamp.to_le_bytes());
813            }
814
815            V2WALRecord::TransactionRollback { tx_id, timestamp } => {
816                buffer.extend_from_slice(&tx_id.to_le_bytes());
817                buffer.extend_from_slice(&timestamp.to_le_bytes());
818            }
819
820            V2WALRecord::AllocateContiguous {
821                txn_id,
822                region,
823                timestamp,
824            } => {
825                buffer.extend_from_slice(&txn_id.to_le_bytes());
826                let region_bytes = region.serialize();
827                buffer.extend_from_slice(&(region_bytes.len() as u32).to_le_bytes());
828                buffer.extend_from_slice(&region_bytes);
829                buffer.extend_from_slice(&timestamp.to_le_bytes());
830            }
831
832            V2WALRecord::CommitContiguous { txn_id, region } => {
833                buffer.extend_from_slice(&txn_id.to_le_bytes());
834                let region_bytes = region.serialize();
835                buffer.extend_from_slice(&(region_bytes.len() as u32).to_le_bytes());
836                buffer.extend_from_slice(&region_bytes);
837            }
838
839            V2WALRecord::RollbackContiguous { region } => {
840                let region_bytes = region.serialize();
841                buffer.extend_from_slice(&(region_bytes.len() as u32).to_le_bytes());
842                buffer.extend_from_slice(&region_bytes);
843            }
844
845            V2WALRecord::KvSet {
846                key,
847                value_bytes,
848                value_type,
849                ttl_seconds,
850                version,
851            } => {
852                // Serialize KvSet record
853                buffer.extend_from_slice(&(key.len() as u32).to_le_bytes());
854                buffer.extend_from_slice(key);
855                buffer.extend_from_slice(&(value_bytes.len() as u32).to_le_bytes());
856                buffer.extend_from_slice(value_bytes);
857                buffer.push(*value_type);
858                // Serialize TTL option
859                match ttl_seconds {
860                    Some(ttl) => {
861                        buffer.push(1); // Some
862                        buffer.extend_from_slice(&ttl.to_le_bytes());
863                    }
864                    None => {
865                        buffer.push(0); // None
866                    }
867                }
868                buffer.extend_from_slice(&version.to_le_bytes());
869            }
870
871            V2WALRecord::KvDelete {
872                key,
873                old_value_bytes,
874                old_value_type,
875                old_version,
876            } => {
877                // Serialize KvDelete record
878                buffer.extend_from_slice(&(key.len() as u32).to_le_bytes());
879                buffer.extend_from_slice(key);
880                buffer.push(*old_value_type);
881                // Serialize old_value option
882                match old_value_bytes {
883                    Some(value) => {
884                        buffer.push(1); // Some
885                        buffer.extend_from_slice(&(value.len() as u32).to_le_bytes());
886                        buffer.extend_from_slice(value);
887                    }
888                    None => {
889                        buffer.push(0); // None
890                    }
891                }
892                buffer.extend_from_slice(&old_version.to_le_bytes());
893            }
894
895            // Implement other record types similarly...
896            _ => {
897                return Err(NativeBackendError::CorruptStringTable {
898                    reason: format!(
899                        "WAL serialization error - unsupported record type: {:?}",
900                        record.record_type()
901                    ),
902                });
903            }
904        }
905
906        // Write the actual record size
907        let record_size = buffer.len() - data_start;
908        let size_bytes = (record_size as u32).to_le_bytes();
909        buffer[size_pos..size_pos + 4].copy_from_slice(&size_bytes);
910
911        Ok(buffer)
912    }
913
914    /// Deserialize a WAL record from bytes
915    pub fn deserialize(data: &[u8]) -> NativeResult<V2WALRecord> {
916        if data.is_empty() {
917            return Err(NativeBackendError::CorruptStringTable {
918                reason: "WAL deserialization error - empty data buffer".to_string(),
919            });
920        }
921
922        let record_type = V2WALRecordType::try_from(data[0])?;
923
924        if data.len() < 5 {
925            return Err(NativeBackendError::CorruptStringTable {
926                reason: "WAL deserialization error - insufficient data for record size".to_string(),
927            });
928        }
929
930        let record_size = u32::from_le_bytes([data[1], data[2], data[3], data[4]]) as usize;
931
932        if data.len() < 5 + record_size {
933            return Err(NativeBackendError::CorruptStringTable {
934                reason: format!(
935                    "WAL deserialization error - insufficient data: expected {}, got {}",
936                    record_size + 5,
937                    data.len()
938                ),
939            });
940        }
941
942        let record_data = &data[5..5 + record_size];
943
944        // Deserialize based on record type
945        match record_type {
946            V2WALRecordType::NodeInsert => {
947                if record_data.len() < 16 {
948                    return Err(NativeBackendError::CorruptStringTable {
949                        reason: "NodeInsert deserialization error - insufficient data for header"
950                            .to_string(),
951                    });
952                }
953
954                let node_id = i64::from_le_bytes(record_data[0..8].try_into().unwrap());
955                let slot_offset = u64::from_le_bytes(record_data[8..16].try_into().unwrap());
956
957                if record_data.len() < 20 {
958                    return Err(NativeBackendError::CorruptStringTable {
959                        reason:
960                            "NodeInsert deserialization error - insufficient data for size field"
961                                .to_string(),
962                    });
963                }
964
965                let data_len = u32::from_le_bytes(record_data[16..20].try_into().unwrap()) as usize;
966
967                if record_data.len() < 20 + data_len {
968                    return Err(NativeBackendError::CorruptStringTable {
969                        reason:
970                            "NodeInsert deserialization error - insufficient data for node data"
971                                .to_string(),
972                    });
973                }
974
975                let node_data = record_data[20..20 + data_len].to_vec();
976
977                Ok(V2WALRecord::NodeInsert {
978                    node_id,
979                    slot_offset,
980                    node_data,
981                })
982            }
983
984            V2WALRecordType::NodeDelete => {
985                if record_data.len() < 16 {
986                    return Err(NativeBackendError::CorruptStringTable {
987                        reason: "NodeDelete deserialization error - insufficient data for header"
988                            .to_string(),
989                    });
990                }
991
992                let node_id = i64::from_le_bytes(record_data[0..8].try_into().unwrap());
993                let slot_offset = u64::from_le_bytes(record_data[8..16].try_into().unwrap());
994
995                if record_data.len() < 20 {
996                    return Err(NativeBackendError::CorruptStringTable {
997                        reason:
998                            "NodeDelete deserialization error - insufficient data for size field"
999                                .to_string(),
1000                    });
1001                }
1002
1003                let data_len = u32::from_le_bytes(record_data[16..20].try_into().unwrap()) as usize;
1004
1005                if record_data.len() < 20 + data_len {
1006                    return Err(NativeBackendError::CorruptStringTable {
1007                        reason:
1008                            "NodeDelete deserialization error - insufficient data for node data"
1009                                .to_string(),
1010                    });
1011                }
1012
1013                let old_data = record_data[20..20 + data_len].to_vec();
1014                let mut offset = 20 + data_len;
1015
1016                // Read outgoing edges
1017                if record_data.len() < offset + 4 {
1018                    return Err(NativeBackendError::CorruptStringTable {
1019                        reason: "NodeDelete deserialization error - insufficient data for outgoing edge count".to_string(),
1020                    });
1021                }
1022                let outgoing_count =
1023                    u32::from_le_bytes(record_data[offset..offset + 4].try_into().unwrap())
1024                        as usize;
1025                offset += 4;
1026
1027                let mut outgoing_edges = Vec::with_capacity(outgoing_count);
1028                for _ in 0..outgoing_count {
1029                    if record_data.len() < offset + 12 {
1030                        return Err(NativeBackendError::CorruptStringTable {
1031                            reason: "NodeDelete deserialization error - insufficient data for outgoing edge header".to_string(),
1032                        });
1033                    }
1034
1035                    // CompactEdgeRecord layout: neighbor_id (8) + type_offset (2) + data_len (2) = 12 bytes min
1036                    let edge_data_len = u16::from_be_bytes(
1037                        record_data[offset + 10..offset + 12].try_into().unwrap(),
1038                    ) as usize;
1039                    let edge_total_len = 12 + edge_data_len;
1040
1041                    if record_data.len() < offset + edge_total_len {
1042                        return Err(NativeBackendError::CorruptStringTable {
1043                            reason: "NodeDelete deserialization error - insufficient data for outgoing edge".to_string(),
1044                        });
1045                    }
1046
1047                    let edge_bytes = &record_data[offset..offset + edge_total_len];
1048                    match CompactEdgeRecord::deserialize(edge_bytes) {
1049                        Ok(edge) => outgoing_edges.push(edge),
1050                        Err(e) => {
1051                            return Err(NativeBackendError::CorruptStringTable {
1052                                reason: format!(
1053                                    "NodeDelete deserialization error - failed to deserialize outgoing edge: {:?}",
1054                                    e
1055                                ),
1056                            });
1057                        }
1058                    }
1059                    offset += edge_total_len;
1060                }
1061
1062                // Read incoming edges
1063                if record_data.len() < offset + 4 {
1064                    return Err(NativeBackendError::CorruptStringTable {
1065                        reason: "NodeDelete deserialization error - insufficient data for incoming edge count".to_string(),
1066                    });
1067                }
1068                let incoming_count =
1069                    u32::from_le_bytes(record_data[offset..offset + 4].try_into().unwrap())
1070                        as usize;
1071                offset += 4;
1072
1073                let mut incoming_edges = Vec::with_capacity(incoming_count);
1074                for _ in 0..incoming_count {
1075                    if record_data.len() < offset + 12 {
1076                        return Err(NativeBackendError::CorruptStringTable {
1077                            reason: "NodeDelete deserialization error - insufficient data for incoming edge header".to_string(),
1078                        });
1079                    }
1080
1081                    let edge_data_len = u16::from_be_bytes(
1082                        record_data[offset + 10..offset + 12].try_into().unwrap(),
1083                    ) as usize;
1084                    let edge_total_len = 12 + edge_data_len;
1085
1086                    if record_data.len() < offset + edge_total_len {
1087                        return Err(NativeBackendError::CorruptStringTable {
1088                            reason: "NodeDelete deserialization error - insufficient data for incoming edge".to_string(),
1089                        });
1090                    }
1091
1092                    let edge_bytes = &record_data[offset..offset + edge_total_len];
1093                    match CompactEdgeRecord::deserialize(edge_bytes) {
1094                        Ok(edge) => incoming_edges.push(edge),
1095                        Err(e) => {
1096                            return Err(NativeBackendError::CorruptStringTable {
1097                                reason: format!(
1098                                    "NodeDelete deserialization error - failed to deserialize incoming edge: {:?}",
1099                                    e
1100                                ),
1101                            });
1102                        }
1103                    }
1104                    offset += edge_total_len;
1105                }
1106
1107                Ok(V2WALRecord::NodeDelete {
1108                    node_id,
1109                    slot_offset,
1110                    old_data,
1111                    outgoing_edges,
1112                    incoming_edges,
1113                })
1114            }
1115
1116            V2WALRecordType::TransactionBegin => {
1117                if record_data.len() < 16 {
1118                    return Err(NativeBackendError::CorruptStringTable {
1119                        reason: "TransactionBegin deserialization error - insufficient data"
1120                            .to_string(),
1121                    });
1122                }
1123
1124                let tx_id = u64::from_le_bytes(record_data[0..8].try_into().unwrap());
1125                let timestamp = u64::from_le_bytes(record_data[8..16].try_into().unwrap());
1126
1127                Ok(V2WALRecord::TransactionBegin { tx_id, timestamp })
1128            }
1129
1130            V2WALRecordType::TransactionCommit => {
1131                if record_data.len() < 16 {
1132                    return Err(NativeBackendError::CorruptStringTable {
1133                        reason: "TransactionCommit deserialization error - insufficient data"
1134                            .to_string(),
1135                    });
1136                }
1137
1138                let tx_id = u64::from_le_bytes(record_data[0..8].try_into().unwrap());
1139                let timestamp = u64::from_le_bytes(record_data[8..16].try_into().unwrap());
1140
1141                Ok(V2WALRecord::TransactionCommit { tx_id, timestamp })
1142            }
1143
1144            V2WALRecordType::TransactionRollback => {
1145                if record_data.len() < 16 {
1146                    return Err(NativeBackendError::CorruptStringTable {
1147                        reason: "TransactionRollback deserialization error - insufficient data"
1148                            .to_string(),
1149                    });
1150                }
1151
1152                let tx_id = u64::from_le_bytes(record_data[0..8].try_into().unwrap());
1153                let timestamp = u64::from_le_bytes(record_data[8..16].try_into().unwrap());
1154
1155                Ok(V2WALRecord::TransactionRollback { tx_id, timestamp })
1156            }
1157
1158            V2WALRecordType::AllocateContiguous => {
1159                if record_data.len() < 40 {
1160                    return Err(NativeBackendError::CorruptStringTable {
1161                        reason: format!(
1162                            "AllocateContiguous deserialization error - insufficient data: expected 40, got {}",
1163                            record_data.len()
1164                        ),
1165                    });
1166                }
1167
1168                let txn_id = u64::from_le_bytes(record_data[0..8].try_into().unwrap());
1169                let region_len =
1170                    u32::from_le_bytes(record_data[8..12].try_into().unwrap()) as usize;
1171                let region_bytes = &record_data[12..12 + region_len];
1172                let timestamp = u64::from_le_bytes(
1173                    record_data[12 + region_len..20 + region_len]
1174                        .try_into()
1175                        .unwrap(),
1176                );
1177
1178                let region = ContiguousRegion::deserialize(region_bytes).map_err(|e| {
1179                    NativeBackendError::CorruptStringTable {
1180                        reason: format!(
1181                            "AllocateContiguous deserialization error - invalid region: {}",
1182                            e
1183                        ),
1184                    }
1185                })?;
1186
1187                Ok(V2WALRecord::AllocateContiguous {
1188                    txn_id,
1189                    region,
1190                    timestamp,
1191                })
1192            }
1193
1194            V2WALRecordType::CommitContiguous => {
1195                if record_data.len() < 12 {
1196                    return Err(NativeBackendError::CorruptStringTable {
1197                        reason: "CommitContiguous deserialization error - insufficient data"
1198                            .to_string(),
1199                    });
1200                }
1201
1202                let txn_id = u64::from_le_bytes(record_data[0..8].try_into().unwrap());
1203                let region_len =
1204                    u32::from_le_bytes(record_data[8..12].try_into().unwrap()) as usize;
1205
1206                if record_data.len() < 12 + region_len {
1207                    return Err(NativeBackendError::CorruptStringTable {
1208                        reason:
1209                            "CommitContiguous deserialization error - insufficient data for region"
1210                                .to_string(),
1211                    });
1212                }
1213
1214                let region_bytes = &record_data[12..12 + region_len];
1215                let region = ContiguousRegion::deserialize(region_bytes).map_err(|e| {
1216                    NativeBackendError::CorruptStringTable {
1217                        reason: format!(
1218                            "CommitContiguous deserialization error - invalid region: {}",
1219                            e
1220                        ),
1221                    }
1222                })?;
1223
1224                Ok(V2WALRecord::CommitContiguous { txn_id, region })
1225            }
1226
1227            V2WALRecordType::RollbackContiguous => {
1228                if record_data.len() < 4 {
1229                    return Err(NativeBackendError::CorruptStringTable {
1230                        reason: "RollbackContiguous deserialization error - insufficient data"
1231                            .to_string(),
1232                    });
1233                }
1234
1235                let region_len = u32::from_le_bytes(record_data[0..4].try_into().unwrap()) as usize;
1236
1237                if record_data.len() < 4 + region_len {
1238                    return Err(NativeBackendError::CorruptStringTable {
1239                        reason: "RollbackContiguous deserialization error - insufficient data for region".to_string(),
1240                    });
1241                }
1242
1243                let region_bytes = &record_data[4..4 + region_len];
1244                let region = ContiguousRegion::deserialize(region_bytes).map_err(|e| {
1245                    NativeBackendError::CorruptStringTable {
1246                        reason: format!(
1247                            "RollbackContiguous deserialization error - invalid region: {}",
1248                            e
1249                        ),
1250                    }
1251                })?;
1252
1253                Ok(V2WALRecord::RollbackContiguous { region })
1254            }
1255
1256            V2WALRecordType::KvSet => {
1257                // Deserialize KvSet record: key_len(4) + key + value_len(4) + value + type(1) + ttl_flag(1) + [ttl(8)] + version(8)
1258                if record_data.len() < 4 {
1259                    return Err(NativeBackendError::CorruptStringTable {
1260                        reason: "KvSet deserialization error - insufficient data for key length"
1261                            .to_string(),
1262                    });
1263                }
1264
1265                let key_len = u32::from_le_bytes(record_data[0..4].try_into().unwrap()) as usize;
1266
1267                if record_data.len() < 4 + key_len + 4 {
1268                    return Err(NativeBackendError::CorruptStringTable {
1269                        reason: "KvSet deserialization error - insufficient data for key and value length".to_string(),
1270                    });
1271                }
1272
1273                let key = record_data[4..4 + key_len].to_vec();
1274                let offset = 4 + key_len;
1275
1276                let value_len =
1277                    u32::from_le_bytes(record_data[offset..offset + 4].try_into().unwrap())
1278                        as usize;
1279
1280                if record_data.len() < offset + 4 + value_len + 1 + 1 + 8 {
1281                    return Err(NativeBackendError::CorruptStringTable {
1282                        reason:
1283                            "KvSet deserialization error - insufficient data for value and metadata"
1284                                .to_string(),
1285                    });
1286                }
1287
1288                let value_bytes = record_data[offset + 4..offset + 4 + value_len].to_vec();
1289                let offset = offset + 4 + value_len;
1290
1291                let value_type = record_data[offset];
1292                let offset = offset + 1;
1293
1294                let ttl_flag = record_data[offset];
1295                let ttl_seconds = if ttl_flag == 1 {
1296                    if record_data.len() < offset + 1 + 8 {
1297                        return Err(NativeBackendError::CorruptStringTable {
1298                            reason: "KvSet deserialization error - insufficient data for TTL"
1299                                .to_string(),
1300                        });
1301                    }
1302                    let ttl = u64::from_le_bytes(
1303                        record_data[offset + 1..offset + 1 + 8].try_into().unwrap(),
1304                    );
1305                    Some(ttl)
1306                } else {
1307                    None
1308                };
1309                let offset = offset + 1 + ttl_seconds.map_or(0, |_| 8);
1310
1311                if record_data.len() < offset + 8 {
1312                    return Err(NativeBackendError::CorruptStringTable {
1313                        reason: "KvSet deserialization error - insufficient data for version"
1314                            .to_string(),
1315                    });
1316                }
1317
1318                let version =
1319                    u64::from_le_bytes(record_data[offset..offset + 8].try_into().unwrap());
1320
1321                Ok(V2WALRecord::KvSet {
1322                    key,
1323                    value_bytes,
1324                    value_type,
1325                    ttl_seconds,
1326                    version,
1327                })
1328            }
1329
1330            V2WALRecordType::KvDelete => {
1331                // Deserialize KvDelete record: key_len(4) + key + type(1) + old_value_flag(1) + [len(4) + old_value] + old_version(8)
1332                if record_data.len() < 4 {
1333                    return Err(NativeBackendError::CorruptStringTable {
1334                        reason: "KvDelete deserialization error - insufficient data for key length"
1335                            .to_string(),
1336                    });
1337                }
1338
1339                let key_len = u32::from_le_bytes(record_data[0..4].try_into().unwrap()) as usize;
1340
1341                if record_data.len() < 4 + key_len + 1 {
1342                    return Err(NativeBackendError::CorruptStringTable {
1343                        reason:
1344                            "KvDelete deserialization error - insufficient data for key and type"
1345                                .to_string(),
1346                    });
1347                }
1348
1349                let key = record_data[4..4 + key_len].to_vec();
1350                let offset = 4 + key_len;
1351
1352                let old_value_type = record_data[offset];
1353                let offset = offset + 1;
1354
1355                if record_data.len() < offset + 1 {
1356                    return Err(NativeBackendError::CorruptStringTable {
1357                        reason:
1358                            "KvDelete deserialization error - insufficient data for old value flag"
1359                                .to_string(),
1360                    });
1361                }
1362
1363                let old_value_flag = record_data[offset];
1364                let old_value_bytes = if old_value_flag == 1 {
1365                    if record_data.len() < offset + 1 + 4 {
1366                        return Err(NativeBackendError::CorruptStringTable {
1367                            reason: "KvDelete deserialization error - insufficient data for old value length".to_string(),
1368                        });
1369                    }
1370                    let old_value_len = u32::from_le_bytes(
1371                        record_data[offset + 1..offset + 1 + 4].try_into().unwrap(),
1372                    ) as usize;
1373
1374                    if record_data.len() < offset + 1 + 4 + old_value_len {
1375                        return Err(NativeBackendError::CorruptStringTable {
1376                            reason:
1377                                "KvDelete deserialization error - insufficient data for old value"
1378                                    .to_string(),
1379                        });
1380                    }
1381
1382                    let value =
1383                        record_data[offset + 1 + 4..offset + 1 + 4 + old_value_len].to_vec();
1384                    Some(value)
1385                } else {
1386                    None
1387                };
1388                let offset = offset + 1 + old_value_bytes.as_ref().map_or(0, |v| 4 + v.len());
1389
1390                if record_data.len() < offset + 8 {
1391                    return Err(NativeBackendError::CorruptStringTable {
1392                        reason:
1393                            "KvDelete deserialization error - insufficient data for old version"
1394                                .to_string(),
1395                    });
1396                }
1397
1398                let old_version =
1399                    u64::from_le_bytes(record_data[offset..offset + 8].try_into().unwrap());
1400
1401                Ok(V2WALRecord::KvDelete {
1402                    key,
1403                    old_value_bytes,
1404                    old_value_type,
1405                    old_version,
1406                })
1407            }
1408
1409            // Implement other record types similarly...
1410            _ => Err(NativeBackendError::CorruptStringTable {
1411                reason: format!(
1412                    "WAL deserialization error - unsupported record type: {:?}",
1413                    record_type
1414                ),
1415            }),
1416        }
1417    }
1418}
1419
1420#[cfg(test)]
1421mod tests {
1422    use super::*;
1423
1424    #[test]
1425    fn test_record_type_properties() {
1426        assert!(V2WALRecordType::NodeInsert.requires_checkpoint());
1427        assert!(V2WALRecordType::TransactionBegin.is_transaction_control());
1428        assert!(!V2WALRecordType::TransactionBegin.requires_checkpoint());
1429    }
1430
1431    #[test]
1432    fn test_v2_wal_record_cluster_key() {
1433        let record = V2WALRecord::NodeInsert {
1434            node_id: 42,
1435            slot_offset: 1024,
1436            node_data: vec![1, 2, 3],
1437        };
1438        assert_eq!(record.cluster_key(), Some(42));
1439
1440        let record = V2WALRecord::TransactionBegin {
1441            tx_id: 100,
1442            timestamp: 123456,
1443        };
1444        assert_eq!(record.cluster_key(), None);
1445    }
1446
1447    #[test]
1448    fn test_record_serialization_roundtrip() {
1449        let original = V2WALRecord::NodeInsert {
1450            node_id: 123,
1451            slot_offset: 4096,
1452            node_data: vec![4, 5, 6, 7, 8],
1453        };
1454
1455        let serialized = V2WALSerializer::serialize(&original).unwrap();
1456        let deserialized = V2WALSerializer::deserialize(&serialized).unwrap();
1457
1458        match (original, deserialized) {
1459            (
1460                V2WALRecord::NodeInsert {
1461                    node_id: id1,
1462                    slot_offset: off1,
1463                    node_data: data1,
1464                },
1465                V2WALRecord::NodeInsert {
1466                    node_id: id2,
1467                    slot_offset: off2,
1468                    node_data: data2,
1469                },
1470            ) => {
1471                assert_eq!(id1, id2);
1472                assert_eq!(off1, off2);
1473                assert_eq!(data1, data2);
1474            }
1475            _ => panic!("Record type mismatch after roundtrip"),
1476        }
1477    }
1478
1479    #[test]
1480    fn test_serialized_size_estimation() {
1481        let record = V2WALRecord::NodeInsert {
1482            node_id: 42,
1483            slot_offset: 1024,
1484            node_data: vec![1, 2, 3, 4, 5],
1485        };
1486
1487        let estimated = record.serialized_size();
1488        let serialized = V2WALSerializer::serialize(&record).unwrap();
1489
1490        // Estimated size should be >= actual serialized size
1491        assert!(estimated >= serialized.len());
1492    }
1493
1494    #[test]
1495    fn test_kv_set_serialization_roundtrip() {
1496        let original = V2WALRecord::KvSet {
1497            key: b"test_key".to_vec(),
1498            value_bytes: b"test_value".to_vec(),
1499            value_type: 1,
1500            ttl_seconds: Some(3600),
1501            version: 12345,
1502        };
1503
1504        let serialized = V2WALSerializer::serialize(&original).unwrap();
1505        let deserialized = V2WALSerializer::deserialize(&serialized).unwrap();
1506
1507        match (original, deserialized) {
1508            (
1509                V2WALRecord::KvSet {
1510                    key: k1,
1511                    value_bytes: v1,
1512                    value_type: t1,
1513                    ttl_seconds: ttl1,
1514                    version: ver1,
1515                },
1516                V2WALRecord::KvSet {
1517                    key: k2,
1518                    value_bytes: v2,
1519                    value_type: t2,
1520                    ttl_seconds: ttl2,
1521                    version: ver2,
1522                },
1523            ) => {
1524                assert_eq!(k1, k2);
1525                assert_eq!(v1, v2);
1526                assert_eq!(t1, t2);
1527                assert_eq!(ttl1, ttl2);
1528                assert_eq!(ver1, ver2);
1529            }
1530            _ => panic!("Record type mismatch after KV set roundtrip"),
1531        }
1532    }
1533
1534    #[test]
1535    fn test_kv_delete_serialization_roundtrip() {
1536        let original = V2WALRecord::KvDelete {
1537            key: b"test_key".to_vec(),
1538            old_value_bytes: Some(b"old_value".to_vec()),
1539            old_value_type: 1,
1540            old_version: 12344,
1541        };
1542
1543        let serialized = V2WALSerializer::serialize(&original).unwrap();
1544        let deserialized = V2WALSerializer::deserialize(&serialized).unwrap();
1545
1546        match (original, deserialized) {
1547            (
1548                V2WALRecord::KvDelete {
1549                    key: k1,
1550                    old_value_bytes: v1,
1551                    old_value_type: t1,
1552                    old_version: ver1,
1553                },
1554                V2WALRecord::KvDelete {
1555                    key: k2,
1556                    old_value_bytes: v2,
1557                    old_value_type: t2,
1558                    old_version: ver2,
1559                },
1560            ) => {
1561                assert_eq!(k1, k2);
1562                assert_eq!(v1, v2);
1563                assert_eq!(t1, t2);
1564                assert_eq!(ver1, ver2);
1565            }
1566            _ => panic!("Record type mismatch after KV delete roundtrip"),
1567        }
1568    }
1569
1570    #[test]
1571    fn test_kv_set_without_ttl() {
1572        let original = V2WALRecord::KvSet {
1573            key: b"no_ttl_key".to_vec(),
1574            value_bytes: b"value".to_vec(),
1575            value_type: 0,
1576            ttl_seconds: None,
1577            version: 1,
1578        };
1579
1580        let serialized = V2WALSerializer::serialize(&original).unwrap();
1581        let deserialized = V2WALSerializer::deserialize(&serialized).unwrap();
1582
1583        match deserialized {
1584            V2WALRecord::KvSet { ttl_seconds, .. } => {
1585                assert_eq!(ttl_seconds, None);
1586            }
1587            _ => panic!("Wrong record type"),
1588        }
1589    }
1590
1591    #[test]
1592    fn test_kv_delete_no_old_value() {
1593        let original = V2WALRecord::KvDelete {
1594            key: b"no_old_value".to_vec(),
1595            old_value_bytes: None,
1596            old_value_type: 0,
1597            old_version: 0,
1598        };
1599
1600        let serialized = V2WALSerializer::serialize(&original).unwrap();
1601        let deserialized = V2WALSerializer::deserialize(&serialized).unwrap();
1602
1603        match deserialized {
1604            V2WALRecord::KvDelete {
1605                old_value_bytes, ..
1606            } => {
1607                assert_eq!(old_value_bytes, None);
1608            }
1609            _ => panic!("Wrong record type"),
1610        }
1611    }
1612}