1use crate::backend::native::v2::edge_cluster::{CompactEdgeRecord, Direction};
8use crate::backend::native::{NativeBackendError, NativeResult};
9#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
11#[repr(u8)]
12pub enum V2WALRecordType {
13 NodeInsert = 1,
15
16 NodeUpdate = 2,
18
19 NodeDelete = 3,
21
22 ClusterCreate = 4,
24
25 EdgeInsert = 5,
27
28 EdgeUpdate = 6,
30
31 EdgeDelete = 7,
33
34 StringInsert = 8,
36
37 FreeSpaceAllocate = 9,
39
40 FreeSpaceDeallocate = 10,
42
43 TransactionBegin = 11,
45
46 TransactionCommit = 12,
48
49 TransactionRollback = 13,
51
52 Checkpoint = 14,
54
55 HeaderUpdate = 15,
57
58 SegmentEnd = 16,
60
61 TransactionPrepare = 17,
63
64 TransactionAbort = 18,
66
67 SavepointCreate = 19,
69
70 SavepointRollback = 20,
72
73 SavepointRelease = 21,
75
76 BackupCreate = 22,
78
79 BackupRestore = 23,
81
82 LockAcquire = 24,
84
85 LockRelease = 25,
87
88 IndexUpdate = 26,
90
91 StatisticsUpdate = 27,
93
94 AllocateContiguous = 28,
96
97 CommitContiguous = 29,
99
100 RollbackContiguous = 30,
102
103 KvSet = 31,
105
106 KvDelete = 32,
108}
109
110impl V2WALRecordType {
111 pub fn data_modifying() -> &'static [V2WALRecordType] {
113 &[
114 Self::NodeInsert,
115 Self::NodeUpdate,
116 Self::NodeDelete,
117 Self::ClusterCreate,
118 Self::EdgeInsert,
119 Self::EdgeUpdate,
120 Self::EdgeDelete,
121 Self::StringInsert,
122 Self::FreeSpaceAllocate,
123 Self::FreeSpaceDeallocate,
124 Self::HeaderUpdate,
125 Self::AllocateContiguous,
126 Self::CommitContiguous,
127 Self::RollbackContiguous,
128 Self::KvSet,
129 Self::KvDelete,
130 ]
131 }
132
133 pub fn transaction_control() -> &'static [V2WALRecordType] {
135 &[
136 Self::TransactionBegin,
137 Self::TransactionCommit,
138 Self::TransactionRollback,
139 ]
140 }
141
142 pub fn requires_checkpoint(&self) -> bool {
144 Self::data_modifying().contains(self)
145 }
146
147 pub fn is_transaction_control(&self) -> bool {
149 Self::transaction_control().contains(self)
150 }
151}
152
153impl TryFrom<u8> for V2WALRecordType {
154 type Error = NativeBackendError;
155
156 fn try_from(value: u8) -> NativeResult<Self> {
157 match value {
158 1 => Ok(Self::NodeInsert),
159 2 => Ok(Self::NodeUpdate),
160 3 => Ok(Self::NodeDelete),
161 4 => Ok(Self::ClusterCreate),
162 5 => Ok(Self::EdgeInsert),
163 6 => Ok(Self::EdgeUpdate),
164 7 => Ok(Self::EdgeDelete),
165 8 => Ok(Self::StringInsert),
166 9 => Ok(Self::FreeSpaceAllocate),
167 10 => Ok(Self::FreeSpaceDeallocate),
168 11 => Ok(Self::TransactionBegin),
169 12 => Ok(Self::TransactionCommit),
170 13 => Ok(Self::TransactionRollback),
171 14 => Ok(Self::Checkpoint),
172 15 => Ok(Self::HeaderUpdate),
173 16 => Ok(Self::SegmentEnd),
174 17 => Ok(Self::TransactionPrepare),
175 18 => Ok(Self::TransactionAbort),
176 19 => Ok(Self::SavepointCreate),
177 20 => Ok(Self::SavepointRollback),
178 21 => Ok(Self::SavepointRelease),
179 22 => Ok(Self::BackupCreate),
180 23 => Ok(Self::BackupRestore),
181 24 => Ok(Self::LockAcquire),
182 25 => Ok(Self::LockRelease),
183 26 => Ok(Self::IndexUpdate),
184 27 => Ok(Self::StatisticsUpdate),
185 28 => Ok(Self::AllocateContiguous),
186 29 => Ok(Self::CommitContiguous),
187 30 => Ok(Self::RollbackContiguous),
188 31 => Ok(Self::KvSet),
189 32 => Ok(Self::KvDelete),
190 _ => Err(NativeBackendError::CorruptStringTable {
191 reason: format!("unknown WAL record type: {}", value),
192 }),
193 }
194 }
195}
196
197#[derive(Debug, Clone)]
199pub enum V2WALRecord {
200 NodeInsert {
202 node_id: i64,
203 slot_offset: u64,
204 node_data: Vec<u8>,
205 },
206
207 NodeUpdate {
209 node_id: i64,
210 slot_offset: u64,
211 old_data: Vec<u8>,
212 new_data: Vec<u8>,
213 },
214
215 NodeDelete {
217 node_id: i64,
218 slot_offset: u64,
219 old_data: Vec<u8>,
220 outgoing_edges: Vec<CompactEdgeRecord>,
222 incoming_edges: Vec<CompactEdgeRecord>,
224 },
225
226 ClusterCreate {
228 node_id: i64,
229 direction: Direction,
230 cluster_offset: u64,
231 cluster_size: u32,
232 edge_data: Vec<u8>,
233 },
234
235 EdgeInsert {
237 cluster_key: (i64, Direction), edge_record: CompactEdgeRecord,
239 insertion_point: u32,
240 },
241
242 EdgeUpdate {
244 cluster_key: (i64, Direction),
245 old_edge: CompactEdgeRecord,
246 new_edge: CompactEdgeRecord,
247 position: u32,
248 },
249
250 EdgeDelete {
252 cluster_key: (i64, Direction),
253 old_edge: CompactEdgeRecord,
254 position: u32,
255 },
256
257 StringInsert {
259 string_id: u32,
260 string_value: String,
261 },
262
263 FreeSpaceAllocate {
265 block_offset: u64,
266 block_size: u32,
267 block_type: u8,
268 },
269
270 FreeSpaceDeallocate {
272 block_offset: u64,
273 block_size: u32,
274 block_type: u8,
275 },
276
277 TransactionBegin { tx_id: u64, timestamp: u64 },
279
280 TransactionCommit { tx_id: u64, timestamp: u64 },
282
283 TransactionRollback { tx_id: u64, timestamp: u64 },
285
286 Checkpoint {
288 checkpointed_lsn: u64,
289 timestamp: u64,
290 },
291
292 HeaderUpdate {
294 header_offset: u64,
295 old_data: Vec<u8>,
296 new_data: Vec<u8>,
297 },
298
299 SegmentEnd { segment_lsn: u64, checksum: u32 },
301
302 TransactionPrepare {
304 tx_id: u64,
305 record_count: u64,
306 timestamp: std::time::SystemTime,
307 },
308
309 TransactionAbort {
311 tx_id: u64,
312 abort_reason: String,
313 timestamp: std::time::SystemTime,
314 },
315
316 SavepointCreate {
318 tx_id: u64,
319 savepoint_id: String,
320 timestamp: std::time::SystemTime,
321 },
322
323 SavepointRollback {
325 tx_id: u64,
326 savepoint_id: String,
327 timestamp: std::time::SystemTime,
328 },
329
330 SavepointRelease {
332 tx_id: u64,
333 savepoint_id: String,
334 timestamp: std::time::SystemTime,
335 },
336
337 BackupCreate {
339 backup_id: String,
340 backup_path: std::path::PathBuf,
341 timestamp: std::time::SystemTime,
342 },
343
344 BackupRestore {
346 backup_id: String,
347 backup_path: std::path::PathBuf,
348 target_path: std::path::PathBuf,
349 timestamp: std::time::SystemTime,
350 },
351
352 LockAcquire {
354 tx_id: u64,
355 resource_id: i64,
356 lock_type: u8,
357 timestamp: std::time::SystemTime,
358 },
359
360 LockRelease {
362 tx_id: u64,
363 resource_id: i64,
364 timestamp: std::time::SystemTime,
365 },
366
367 IndexUpdate {
369 index_id: u32,
370 operation_type: u8,
371 key_data: Vec<u8>,
372 timestamp: std::time::SystemTime,
373 },
374
375 StatisticsUpdate {
377 stats_type: u8,
378 stats_data: Vec<u8>,
379 timestamp: std::time::SystemTime,
380 },
381
382 AllocateContiguous {
384 txn_id: u64,
385 region: ContiguousRegion,
386 timestamp: u64,
387 },
388
389 CommitContiguous {
391 txn_id: u64,
392 region: ContiguousRegion,
393 },
394
395 RollbackContiguous { region: ContiguousRegion },
397
398 KvSet {
400 key: Vec<u8>,
401 value_bytes: Vec<u8>,
402 value_type: u8,
403 ttl_seconds: Option<u64>,
404 version: u64,
405 },
406
407 KvDelete {
409 key: Vec<u8>,
410 old_value_bytes: Option<Vec<u8>>,
411 old_value_type: u8,
412 old_version: u64,
413 },
414}
415
416#[derive(Debug, Clone, PartialEq, Eq)]
418pub struct ContiguousRegion {
419 pub start_offset: u64,
421 pub total_size: u64,
423 pub cluster_count: u32,
425 pub stride: u32,
427}
428
429impl ContiguousRegion {
430 pub fn new(start_offset: u64, total_size: u64) -> Self {
432 Self {
433 start_offset,
434 total_size,
435 cluster_count: 0,
436 stride: 0,
437 }
438 }
439
440 pub fn with_clusters(mut self, cluster_count: u32, stride: u32) -> Self {
442 self.cluster_count = cluster_count;
443 self.stride = stride;
444 self
445 }
446
447 pub fn end_offset(&self) -> u64 {
449 self.start_offset + self.total_size
450 }
451
452 pub fn overlaps(&self, other: &ContiguousRegion) -> bool {
454 self.start_offset < other.end_offset() && other.start_offset < self.end_offset()
455 }
456
457 pub fn serialize(&self) -> Vec<u8> {
459 let mut buffer = Vec::with_capacity(24); buffer.extend_from_slice(&self.start_offset.to_le_bytes());
461 buffer.extend_from_slice(&self.total_size.to_le_bytes());
462 buffer.extend_from_slice(&self.cluster_count.to_le_bytes());
463 buffer.extend_from_slice(&self.stride.to_le_bytes());
464 buffer
465 }
466
467 pub fn deserialize(data: &[u8]) -> Result<Self, String> {
469 if data.len() < 24 {
470 return Err(format!(
471 "Insufficient data for ContiguousRegion: expected 24, got {}",
472 data.len()
473 ));
474 }
475
476 let start_offset = u64::from_le_bytes(data[0..8].try_into().unwrap());
477 let total_size = u64::from_le_bytes(data[8..16].try_into().unwrap());
478 let cluster_count = u32::from_le_bytes(data[16..20].try_into().unwrap());
479 let stride = u32::from_le_bytes(data[20..24].try_into().unwrap());
480
481 Ok(Self {
482 start_offset,
483 total_size,
484 cluster_count,
485 stride,
486 })
487 }
488}
489
490impl V2WALRecord {
491 pub fn record_type(&self) -> V2WALRecordType {
493 match self {
494 Self::NodeInsert { .. } => V2WALRecordType::NodeInsert,
495 Self::NodeUpdate { .. } => V2WALRecordType::NodeUpdate,
496 Self::NodeDelete { .. } => V2WALRecordType::NodeDelete,
497 Self::ClusterCreate { .. } => V2WALRecordType::ClusterCreate,
498 Self::EdgeInsert { .. } => V2WALRecordType::EdgeInsert,
499 Self::EdgeUpdate { .. } => V2WALRecordType::EdgeUpdate,
500 Self::EdgeDelete { .. } => V2WALRecordType::EdgeDelete,
501 Self::StringInsert { .. } => V2WALRecordType::StringInsert,
502 Self::FreeSpaceAllocate { .. } => V2WALRecordType::FreeSpaceAllocate,
503 Self::FreeSpaceDeallocate { .. } => V2WALRecordType::FreeSpaceDeallocate,
504 Self::TransactionBegin { .. } => V2WALRecordType::TransactionBegin,
505 Self::TransactionCommit { .. } => V2WALRecordType::TransactionCommit,
506 Self::TransactionRollback { .. } => V2WALRecordType::TransactionRollback,
507 Self::Checkpoint { .. } => V2WALRecordType::Checkpoint,
508 Self::HeaderUpdate { .. } => V2WALRecordType::HeaderUpdate,
509 Self::SegmentEnd { .. } => V2WALRecordType::SegmentEnd,
510 Self::TransactionPrepare { .. } => V2WALRecordType::TransactionPrepare,
511 Self::TransactionAbort { .. } => V2WALRecordType::TransactionAbort,
512 Self::SavepointCreate { .. } => V2WALRecordType::SavepointCreate,
513 Self::SavepointRollback { .. } => V2WALRecordType::SavepointRollback,
514 Self::SavepointRelease { .. } => V2WALRecordType::SavepointRelease,
515 Self::BackupCreate { .. } => V2WALRecordType::BackupCreate,
516 Self::BackupRestore { .. } => V2WALRecordType::BackupRestore,
517 Self::LockAcquire { .. } => V2WALRecordType::LockAcquire,
518 Self::LockRelease { .. } => V2WALRecordType::LockRelease,
519 Self::IndexUpdate { .. } => V2WALRecordType::IndexUpdate,
520 Self::StatisticsUpdate { .. } => V2WALRecordType::StatisticsUpdate,
521 Self::AllocateContiguous { .. } => V2WALRecordType::AllocateContiguous,
522 Self::CommitContiguous { .. } => V2WALRecordType::CommitContiguous,
523 Self::RollbackContiguous { .. } => V2WALRecordType::RollbackContiguous,
524 Self::KvSet { .. } => V2WALRecordType::KvSet,
525 Self::KvDelete { .. } => V2WALRecordType::KvDelete,
526 }
527 }
528
529 pub fn cluster_key(&self) -> Option<i64> {
531 match self {
532 Self::NodeInsert { node_id, .. } => Some(*node_id),
533 Self::NodeUpdate { node_id, .. } => Some(*node_id),
534 Self::NodeDelete { node_id, .. } => Some(*node_id),
535 Self::ClusterCreate { node_id, .. } => Some(*node_id),
536 Self::EdgeInsert {
537 cluster_key: (node_id, _),
538 ..
539 } => Some(*node_id),
540 Self::EdgeUpdate {
541 cluster_key: (node_id, _),
542 ..
543 } => Some(*node_id),
544 Self::EdgeDelete {
545 cluster_key: (node_id, _),
546 ..
547 } => Some(*node_id),
548 _ => None,
549 }
550 }
551
552 pub fn serialized_size(&self) -> usize {
554 let base_size = std::mem::size_of::<V2WALRecordType>() + std::mem::size_of::<u32>(); match self {
557 Self::NodeInsert { node_data, .. } => base_size + 8 + 8 + 4 + node_data.len(),
558 Self::NodeUpdate {
559 old_data, new_data, ..
560 } => base_size + 8 + 8 + 4 + old_data.len() + 4 + new_data.len(),
561 Self::NodeDelete {
562 old_data,
563 outgoing_edges,
564 incoming_edges,
565 ..
566 } => {
567 let outgoing_size: usize = outgoing_edges.iter().map(|e| e.serialized_size()).sum();
568 let incoming_size: usize = incoming_edges.iter().map(|e| e.serialized_size()).sum();
569 base_size + 8 + 8 + 4 + old_data.len() + 4 + outgoing_size + 4 + incoming_size
570 }
571 Self::ClusterCreate { edge_data, .. } => base_size + 8 + 1 + 8 + 4 + edge_data.len(),
572 Self::EdgeInsert { edge_record, .. } => {
573 base_size + 8 + 1 + edge_record.serialized_size() + 4
574 }
575 Self::EdgeUpdate {
576 old_edge, new_edge, ..
577 } => base_size + 8 + 1 + old_edge.serialized_size() + new_edge.serialized_size() + 4,
578 Self::EdgeDelete { old_edge, .. } => base_size + 8 + 1 + old_edge.serialized_size() + 4,
579 Self::StringInsert { string_value, .. } => base_size + 4 + string_value.len(),
580 Self::FreeSpaceAllocate { .. } | Self::FreeSpaceDeallocate { .. } => {
581 base_size + 8 + 4 + 1
582 }
583 Self::TransactionBegin { .. }
584 | Self::TransactionCommit { .. }
585 | Self::TransactionRollback { .. } => base_size + 8 + 8,
586 Self::Checkpoint { .. } => base_size + 8 + 8,
587 Self::HeaderUpdate {
588 old_data, new_data, ..
589 } => base_size + 8 + old_data.len() + new_data.len(),
590 Self::SegmentEnd { .. } => base_size + 8 + 4,
591 Self::TransactionPrepare {
592 record_count: _, ..
593 } => base_size + 8 + 8 + 8,
594 Self::TransactionAbort { abort_reason, .. } => base_size + 8 + abort_reason.len(),
595 Self::SavepointCreate { savepoint_id, .. } => base_size + 8 + savepoint_id.len(),
596 Self::SavepointRollback { savepoint_id, .. } => base_size + 8 + savepoint_id.len(),
597 Self::SavepointRelease { savepoint_id, .. } => base_size + 8 + savepoint_id.len(),
598 Self::BackupCreate {
599 backup_id,
600 backup_path,
601 ..
602 } => base_size + backup_id.len() + backup_path.to_string_lossy().len(),
603 Self::BackupRestore {
604 backup_id,
605 backup_path,
606 target_path,
607 ..
608 } => {
609 base_size
610 + backup_id.len()
611 + backup_path.to_string_lossy().len()
612 + target_path.to_string_lossy().len()
613 }
614 Self::LockAcquire { .. } | Self::LockRelease { .. } => base_size + 8 + 8 + 1,
615 Self::IndexUpdate { .. } | Self::StatisticsUpdate { .. } => base_size,
616 Self::AllocateContiguous { .. } => base_size + 8 + 24 + 8, Self::CommitContiguous { .. } => base_size + 8 + 24, Self::RollbackContiguous { .. } => base_size + 24, Self::KvSet {
620 key,
621 value_bytes,
622 ttl_seconds,
623 ..
624 } => {
625 base_size
626 + 4
627 + key.len()
628 + 4
629 + value_bytes.len()
630 + 1
631 + 1
632 + (if ttl_seconds.is_some() { 8 } else { 0 })
633 + 8
634 }
635 Self::KvDelete {
636 key,
637 old_value_bytes,
638 ..
639 } => {
640 base_size
641 + 4
642 + key.len()
643 + 1
644 + old_value_bytes.as_ref().map(|v| 4 + v.len()).unwrap_or(0)
645 + 8
646 }
647 }
648 }
649
650 pub fn modifies_data(&self) -> bool {
652 self.record_type().requires_checkpoint()
653 }
654
655 pub fn is_transaction_control(&self) -> bool {
657 self.record_type().is_transaction_control()
658 }
659}
660
661#[derive(Debug, Clone)]
663pub enum WALSerializationError {
664 InvalidRecordType(u8),
666
667 InsufficientData {
669 expected: usize,
670 actual: usize,
671 record_type: V2WALRecordType,
672 },
673
674 CorruptedData { location: String, details: String },
676
677 IoError(String),
679
680 SizeOverflow,
682}
683
684impl std::fmt::Display for WALSerializationError {
685 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
686 match self {
687 Self::InvalidRecordType(t) => write!(f, "Invalid WAL record type: {}", t),
688 Self::InsufficientData {
689 expected,
690 actual,
691 record_type,
692 } => {
693 write!(
694 f,
695 "Insufficient data for {:?}: expected {}, got {}",
696 record_type, expected, actual
697 )
698 }
699 Self::CorruptedData { location, details } => {
700 write!(f, "Corrupted WAL data at {}: {}", location, details)
701 }
702 Self::IoError(msg) => write!(f, "I/O error during WAL serialization: {}", msg),
703 Self::SizeOverflow => write!(f, "Size overflow in WAL record data"),
704 }
705 }
706}
707
708impl std::error::Error for WALSerializationError {}
709
710pub struct V2WALSerializer;
712
713impl V2WALSerializer {
714 pub fn serialize(record: &V2WALRecord) -> NativeResult<Vec<u8>> {
716 let mut buffer = Vec::with_capacity(record.serialized_size());
717
718 buffer.push(record.record_type() as u8);
720
721 let size_pos = buffer.len();
723 buffer.extend_from_slice(&[0u8; 4]);
724
725 let data_start = buffer.len();
726
727 match record {
729 V2WALRecord::NodeInsert {
730 node_id,
731 slot_offset,
732 node_data,
733 } => {
734 buffer.extend_from_slice(&node_id.to_le_bytes());
735 buffer.extend_from_slice(&slot_offset.to_le_bytes());
736 buffer.extend_from_slice(&(node_data.len() as u32).to_le_bytes());
737 buffer.extend_from_slice(node_data);
738 }
739
740 V2WALRecord::NodeUpdate {
741 node_id,
742 slot_offset,
743 old_data,
744 new_data,
745 } => {
746 buffer.extend_from_slice(&node_id.to_le_bytes());
747 buffer.extend_from_slice(&slot_offset.to_le_bytes());
748 buffer.extend_from_slice(&(old_data.len() as u32).to_le_bytes());
749 buffer.extend_from_slice(old_data);
750 buffer.extend_from_slice(&(new_data.len() as u32).to_le_bytes());
751 buffer.extend_from_slice(new_data);
752 }
753
754 V2WALRecord::NodeDelete {
755 node_id,
756 slot_offset,
757 old_data,
758 outgoing_edges,
759 incoming_edges,
760 } => {
761 buffer.extend_from_slice(&node_id.to_le_bytes());
762 buffer.extend_from_slice(&slot_offset.to_le_bytes());
763 buffer.extend_from_slice(&(old_data.len() as u32).to_le_bytes());
764 buffer.extend_from_slice(old_data);
765
766 buffer.extend_from_slice(&(outgoing_edges.len() as u32).to_le_bytes());
768 for edge in outgoing_edges {
769 buffer.extend_from_slice(&edge.serialize());
770 }
771
772 buffer.extend_from_slice(&(incoming_edges.len() as u32).to_le_bytes());
774 for edge in incoming_edges {
775 buffer.extend_from_slice(&edge.serialize());
776 }
777 }
778
779 V2WALRecord::ClusterCreate {
780 node_id,
781 direction,
782 cluster_offset,
783 cluster_size,
784 edge_data,
785 } => {
786 buffer.extend_from_slice(&node_id.to_le_bytes());
787 buffer.push(*direction as u8);
788 buffer.extend_from_slice(&cluster_offset.to_le_bytes());
789 buffer.extend_from_slice(&cluster_size.to_le_bytes());
790 buffer.extend_from_slice(&(edge_data.len() as u32).to_le_bytes());
791 buffer.extend_from_slice(edge_data);
792 }
793
794 V2WALRecord::EdgeInsert {
795 cluster_key,
796 edge_record,
797 insertion_point,
798 } => {
799 buffer.extend_from_slice(&cluster_key.0.to_le_bytes());
800 buffer.push(cluster_key.1 as u8);
801 buffer.extend_from_slice(&edge_record.as_bytes());
802 buffer.extend_from_slice(&insertion_point.to_le_bytes());
803 }
804
805 V2WALRecord::TransactionBegin { tx_id, timestamp } => {
806 buffer.extend_from_slice(&tx_id.to_le_bytes());
807 buffer.extend_from_slice(×tamp.to_le_bytes());
808 }
809
810 V2WALRecord::TransactionCommit { tx_id, timestamp } => {
811 buffer.extend_from_slice(&tx_id.to_le_bytes());
812 buffer.extend_from_slice(×tamp.to_le_bytes());
813 }
814
815 V2WALRecord::TransactionRollback { tx_id, timestamp } => {
816 buffer.extend_from_slice(&tx_id.to_le_bytes());
817 buffer.extend_from_slice(×tamp.to_le_bytes());
818 }
819
820 V2WALRecord::AllocateContiguous {
821 txn_id,
822 region,
823 timestamp,
824 } => {
825 buffer.extend_from_slice(&txn_id.to_le_bytes());
826 let region_bytes = region.serialize();
827 buffer.extend_from_slice(&(region_bytes.len() as u32).to_le_bytes());
828 buffer.extend_from_slice(®ion_bytes);
829 buffer.extend_from_slice(×tamp.to_le_bytes());
830 }
831
832 V2WALRecord::CommitContiguous { txn_id, region } => {
833 buffer.extend_from_slice(&txn_id.to_le_bytes());
834 let region_bytes = region.serialize();
835 buffer.extend_from_slice(&(region_bytes.len() as u32).to_le_bytes());
836 buffer.extend_from_slice(®ion_bytes);
837 }
838
839 V2WALRecord::RollbackContiguous { region } => {
840 let region_bytes = region.serialize();
841 buffer.extend_from_slice(&(region_bytes.len() as u32).to_le_bytes());
842 buffer.extend_from_slice(®ion_bytes);
843 }
844
845 V2WALRecord::KvSet {
846 key,
847 value_bytes,
848 value_type,
849 ttl_seconds,
850 version,
851 } => {
852 buffer.extend_from_slice(&(key.len() as u32).to_le_bytes());
854 buffer.extend_from_slice(key);
855 buffer.extend_from_slice(&(value_bytes.len() as u32).to_le_bytes());
856 buffer.extend_from_slice(value_bytes);
857 buffer.push(*value_type);
858 match ttl_seconds {
860 Some(ttl) => {
861 buffer.push(1); buffer.extend_from_slice(&ttl.to_le_bytes());
863 }
864 None => {
865 buffer.push(0); }
867 }
868 buffer.extend_from_slice(&version.to_le_bytes());
869 }
870
871 V2WALRecord::KvDelete {
872 key,
873 old_value_bytes,
874 old_value_type,
875 old_version,
876 } => {
877 buffer.extend_from_slice(&(key.len() as u32).to_le_bytes());
879 buffer.extend_from_slice(key);
880 buffer.push(*old_value_type);
881 match old_value_bytes {
883 Some(value) => {
884 buffer.push(1); buffer.extend_from_slice(&(value.len() as u32).to_le_bytes());
886 buffer.extend_from_slice(value);
887 }
888 None => {
889 buffer.push(0); }
891 }
892 buffer.extend_from_slice(&old_version.to_le_bytes());
893 }
894
895 _ => {
897 return Err(NativeBackendError::CorruptStringTable {
898 reason: format!(
899 "WAL serialization error - unsupported record type: {:?}",
900 record.record_type()
901 ),
902 });
903 }
904 }
905
906 let record_size = buffer.len() - data_start;
908 let size_bytes = (record_size as u32).to_le_bytes();
909 buffer[size_pos..size_pos + 4].copy_from_slice(&size_bytes);
910
911 Ok(buffer)
912 }
913
914 pub fn deserialize(data: &[u8]) -> NativeResult<V2WALRecord> {
916 if data.is_empty() {
917 return Err(NativeBackendError::CorruptStringTable {
918 reason: "WAL deserialization error - empty data buffer".to_string(),
919 });
920 }
921
922 let record_type = V2WALRecordType::try_from(data[0])?;
923
924 if data.len() < 5 {
925 return Err(NativeBackendError::CorruptStringTable {
926 reason: "WAL deserialization error - insufficient data for record size".to_string(),
927 });
928 }
929
930 let record_size = u32::from_le_bytes([data[1], data[2], data[3], data[4]]) as usize;
931
932 if data.len() < 5 + record_size {
933 return Err(NativeBackendError::CorruptStringTable {
934 reason: format!(
935 "WAL deserialization error - insufficient data: expected {}, got {}",
936 record_size + 5,
937 data.len()
938 ),
939 });
940 }
941
942 let record_data = &data[5..5 + record_size];
943
944 match record_type {
946 V2WALRecordType::NodeInsert => {
947 if record_data.len() < 16 {
948 return Err(NativeBackendError::CorruptStringTable {
949 reason: "NodeInsert deserialization error - insufficient data for header"
950 .to_string(),
951 });
952 }
953
954 let node_id = i64::from_le_bytes(record_data[0..8].try_into().unwrap());
955 let slot_offset = u64::from_le_bytes(record_data[8..16].try_into().unwrap());
956
957 if record_data.len() < 20 {
958 return Err(NativeBackendError::CorruptStringTable {
959 reason:
960 "NodeInsert deserialization error - insufficient data for size field"
961 .to_string(),
962 });
963 }
964
965 let data_len = u32::from_le_bytes(record_data[16..20].try_into().unwrap()) as usize;
966
967 if record_data.len() < 20 + data_len {
968 return Err(NativeBackendError::CorruptStringTable {
969 reason:
970 "NodeInsert deserialization error - insufficient data for node data"
971 .to_string(),
972 });
973 }
974
975 let node_data = record_data[20..20 + data_len].to_vec();
976
977 Ok(V2WALRecord::NodeInsert {
978 node_id,
979 slot_offset,
980 node_data,
981 })
982 }
983
984 V2WALRecordType::NodeDelete => {
985 if record_data.len() < 16 {
986 return Err(NativeBackendError::CorruptStringTable {
987 reason: "NodeDelete deserialization error - insufficient data for header"
988 .to_string(),
989 });
990 }
991
992 let node_id = i64::from_le_bytes(record_data[0..8].try_into().unwrap());
993 let slot_offset = u64::from_le_bytes(record_data[8..16].try_into().unwrap());
994
995 if record_data.len() < 20 {
996 return Err(NativeBackendError::CorruptStringTable {
997 reason:
998 "NodeDelete deserialization error - insufficient data for size field"
999 .to_string(),
1000 });
1001 }
1002
1003 let data_len = u32::from_le_bytes(record_data[16..20].try_into().unwrap()) as usize;
1004
1005 if record_data.len() < 20 + data_len {
1006 return Err(NativeBackendError::CorruptStringTable {
1007 reason:
1008 "NodeDelete deserialization error - insufficient data for node data"
1009 .to_string(),
1010 });
1011 }
1012
1013 let old_data = record_data[20..20 + data_len].to_vec();
1014 let mut offset = 20 + data_len;
1015
1016 if record_data.len() < offset + 4 {
1018 return Err(NativeBackendError::CorruptStringTable {
1019 reason: "NodeDelete deserialization error - insufficient data for outgoing edge count".to_string(),
1020 });
1021 }
1022 let outgoing_count =
1023 u32::from_le_bytes(record_data[offset..offset + 4].try_into().unwrap())
1024 as usize;
1025 offset += 4;
1026
1027 let mut outgoing_edges = Vec::with_capacity(outgoing_count);
1028 for _ in 0..outgoing_count {
1029 if record_data.len() < offset + 12 {
1030 return Err(NativeBackendError::CorruptStringTable {
1031 reason: "NodeDelete deserialization error - insufficient data for outgoing edge header".to_string(),
1032 });
1033 }
1034
1035 let edge_data_len = u16::from_be_bytes(
1037 record_data[offset + 10..offset + 12].try_into().unwrap(),
1038 ) as usize;
1039 let edge_total_len = 12 + edge_data_len;
1040
1041 if record_data.len() < offset + edge_total_len {
1042 return Err(NativeBackendError::CorruptStringTable {
1043 reason: "NodeDelete deserialization error - insufficient data for outgoing edge".to_string(),
1044 });
1045 }
1046
1047 let edge_bytes = &record_data[offset..offset + edge_total_len];
1048 match CompactEdgeRecord::deserialize(edge_bytes) {
1049 Ok(edge) => outgoing_edges.push(edge),
1050 Err(e) => {
1051 return Err(NativeBackendError::CorruptStringTable {
1052 reason: format!(
1053 "NodeDelete deserialization error - failed to deserialize outgoing edge: {:?}",
1054 e
1055 ),
1056 });
1057 }
1058 }
1059 offset += edge_total_len;
1060 }
1061
1062 if record_data.len() < offset + 4 {
1064 return Err(NativeBackendError::CorruptStringTable {
1065 reason: "NodeDelete deserialization error - insufficient data for incoming edge count".to_string(),
1066 });
1067 }
1068 let incoming_count =
1069 u32::from_le_bytes(record_data[offset..offset + 4].try_into().unwrap())
1070 as usize;
1071 offset += 4;
1072
1073 let mut incoming_edges = Vec::with_capacity(incoming_count);
1074 for _ in 0..incoming_count {
1075 if record_data.len() < offset + 12 {
1076 return Err(NativeBackendError::CorruptStringTable {
1077 reason: "NodeDelete deserialization error - insufficient data for incoming edge header".to_string(),
1078 });
1079 }
1080
1081 let edge_data_len = u16::from_be_bytes(
1082 record_data[offset + 10..offset + 12].try_into().unwrap(),
1083 ) as usize;
1084 let edge_total_len = 12 + edge_data_len;
1085
1086 if record_data.len() < offset + edge_total_len {
1087 return Err(NativeBackendError::CorruptStringTable {
1088 reason: "NodeDelete deserialization error - insufficient data for incoming edge".to_string(),
1089 });
1090 }
1091
1092 let edge_bytes = &record_data[offset..offset + edge_total_len];
1093 match CompactEdgeRecord::deserialize(edge_bytes) {
1094 Ok(edge) => incoming_edges.push(edge),
1095 Err(e) => {
1096 return Err(NativeBackendError::CorruptStringTable {
1097 reason: format!(
1098 "NodeDelete deserialization error - failed to deserialize incoming edge: {:?}",
1099 e
1100 ),
1101 });
1102 }
1103 }
1104 offset += edge_total_len;
1105 }
1106
1107 Ok(V2WALRecord::NodeDelete {
1108 node_id,
1109 slot_offset,
1110 old_data,
1111 outgoing_edges,
1112 incoming_edges,
1113 })
1114 }
1115
1116 V2WALRecordType::TransactionBegin => {
1117 if record_data.len() < 16 {
1118 return Err(NativeBackendError::CorruptStringTable {
1119 reason: "TransactionBegin deserialization error - insufficient data"
1120 .to_string(),
1121 });
1122 }
1123
1124 let tx_id = u64::from_le_bytes(record_data[0..8].try_into().unwrap());
1125 let timestamp = u64::from_le_bytes(record_data[8..16].try_into().unwrap());
1126
1127 Ok(V2WALRecord::TransactionBegin { tx_id, timestamp })
1128 }
1129
1130 V2WALRecordType::TransactionCommit => {
1131 if record_data.len() < 16 {
1132 return Err(NativeBackendError::CorruptStringTable {
1133 reason: "TransactionCommit deserialization error - insufficient data"
1134 .to_string(),
1135 });
1136 }
1137
1138 let tx_id = u64::from_le_bytes(record_data[0..8].try_into().unwrap());
1139 let timestamp = u64::from_le_bytes(record_data[8..16].try_into().unwrap());
1140
1141 Ok(V2WALRecord::TransactionCommit { tx_id, timestamp })
1142 }
1143
1144 V2WALRecordType::TransactionRollback => {
1145 if record_data.len() < 16 {
1146 return Err(NativeBackendError::CorruptStringTable {
1147 reason: "TransactionRollback deserialization error - insufficient data"
1148 .to_string(),
1149 });
1150 }
1151
1152 let tx_id = u64::from_le_bytes(record_data[0..8].try_into().unwrap());
1153 let timestamp = u64::from_le_bytes(record_data[8..16].try_into().unwrap());
1154
1155 Ok(V2WALRecord::TransactionRollback { tx_id, timestamp })
1156 }
1157
1158 V2WALRecordType::AllocateContiguous => {
1159 if record_data.len() < 40 {
1160 return Err(NativeBackendError::CorruptStringTable {
1161 reason: format!(
1162 "AllocateContiguous deserialization error - insufficient data: expected 40, got {}",
1163 record_data.len()
1164 ),
1165 });
1166 }
1167
1168 let txn_id = u64::from_le_bytes(record_data[0..8].try_into().unwrap());
1169 let region_len =
1170 u32::from_le_bytes(record_data[8..12].try_into().unwrap()) as usize;
1171 let region_bytes = &record_data[12..12 + region_len];
1172 let timestamp = u64::from_le_bytes(
1173 record_data[12 + region_len..20 + region_len]
1174 .try_into()
1175 .unwrap(),
1176 );
1177
1178 let region = ContiguousRegion::deserialize(region_bytes).map_err(|e| {
1179 NativeBackendError::CorruptStringTable {
1180 reason: format!(
1181 "AllocateContiguous deserialization error - invalid region: {}",
1182 e
1183 ),
1184 }
1185 })?;
1186
1187 Ok(V2WALRecord::AllocateContiguous {
1188 txn_id,
1189 region,
1190 timestamp,
1191 })
1192 }
1193
1194 V2WALRecordType::CommitContiguous => {
1195 if record_data.len() < 12 {
1196 return Err(NativeBackendError::CorruptStringTable {
1197 reason: "CommitContiguous deserialization error - insufficient data"
1198 .to_string(),
1199 });
1200 }
1201
1202 let txn_id = u64::from_le_bytes(record_data[0..8].try_into().unwrap());
1203 let region_len =
1204 u32::from_le_bytes(record_data[8..12].try_into().unwrap()) as usize;
1205
1206 if record_data.len() < 12 + region_len {
1207 return Err(NativeBackendError::CorruptStringTable {
1208 reason:
1209 "CommitContiguous deserialization error - insufficient data for region"
1210 .to_string(),
1211 });
1212 }
1213
1214 let region_bytes = &record_data[12..12 + region_len];
1215 let region = ContiguousRegion::deserialize(region_bytes).map_err(|e| {
1216 NativeBackendError::CorruptStringTable {
1217 reason: format!(
1218 "CommitContiguous deserialization error - invalid region: {}",
1219 e
1220 ),
1221 }
1222 })?;
1223
1224 Ok(V2WALRecord::CommitContiguous { txn_id, region })
1225 }
1226
1227 V2WALRecordType::RollbackContiguous => {
1228 if record_data.len() < 4 {
1229 return Err(NativeBackendError::CorruptStringTable {
1230 reason: "RollbackContiguous deserialization error - insufficient data"
1231 .to_string(),
1232 });
1233 }
1234
1235 let region_len = u32::from_le_bytes(record_data[0..4].try_into().unwrap()) as usize;
1236
1237 if record_data.len() < 4 + region_len {
1238 return Err(NativeBackendError::CorruptStringTable {
1239 reason: "RollbackContiguous deserialization error - insufficient data for region".to_string(),
1240 });
1241 }
1242
1243 let region_bytes = &record_data[4..4 + region_len];
1244 let region = ContiguousRegion::deserialize(region_bytes).map_err(|e| {
1245 NativeBackendError::CorruptStringTable {
1246 reason: format!(
1247 "RollbackContiguous deserialization error - invalid region: {}",
1248 e
1249 ),
1250 }
1251 })?;
1252
1253 Ok(V2WALRecord::RollbackContiguous { region })
1254 }
1255
1256 V2WALRecordType::KvSet => {
1257 if record_data.len() < 4 {
1259 return Err(NativeBackendError::CorruptStringTable {
1260 reason: "KvSet deserialization error - insufficient data for key length"
1261 .to_string(),
1262 });
1263 }
1264
1265 let key_len = u32::from_le_bytes(record_data[0..4].try_into().unwrap()) as usize;
1266
1267 if record_data.len() < 4 + key_len + 4 {
1268 return Err(NativeBackendError::CorruptStringTable {
1269 reason: "KvSet deserialization error - insufficient data for key and value length".to_string(),
1270 });
1271 }
1272
1273 let key = record_data[4..4 + key_len].to_vec();
1274 let offset = 4 + key_len;
1275
1276 let value_len =
1277 u32::from_le_bytes(record_data[offset..offset + 4].try_into().unwrap())
1278 as usize;
1279
1280 if record_data.len() < offset + 4 + value_len + 1 + 1 + 8 {
1281 return Err(NativeBackendError::CorruptStringTable {
1282 reason:
1283 "KvSet deserialization error - insufficient data for value and metadata"
1284 .to_string(),
1285 });
1286 }
1287
1288 let value_bytes = record_data[offset + 4..offset + 4 + value_len].to_vec();
1289 let offset = offset + 4 + value_len;
1290
1291 let value_type = record_data[offset];
1292 let offset = offset + 1;
1293
1294 let ttl_flag = record_data[offset];
1295 let ttl_seconds = if ttl_flag == 1 {
1296 if record_data.len() < offset + 1 + 8 {
1297 return Err(NativeBackendError::CorruptStringTable {
1298 reason: "KvSet deserialization error - insufficient data for TTL"
1299 .to_string(),
1300 });
1301 }
1302 let ttl = u64::from_le_bytes(
1303 record_data[offset + 1..offset + 1 + 8].try_into().unwrap(),
1304 );
1305 Some(ttl)
1306 } else {
1307 None
1308 };
1309 let offset = offset + 1 + ttl_seconds.map_or(0, |_| 8);
1310
1311 if record_data.len() < offset + 8 {
1312 return Err(NativeBackendError::CorruptStringTable {
1313 reason: "KvSet deserialization error - insufficient data for version"
1314 .to_string(),
1315 });
1316 }
1317
1318 let version =
1319 u64::from_le_bytes(record_data[offset..offset + 8].try_into().unwrap());
1320
1321 Ok(V2WALRecord::KvSet {
1322 key,
1323 value_bytes,
1324 value_type,
1325 ttl_seconds,
1326 version,
1327 })
1328 }
1329
1330 V2WALRecordType::KvDelete => {
1331 if record_data.len() < 4 {
1333 return Err(NativeBackendError::CorruptStringTable {
1334 reason: "KvDelete deserialization error - insufficient data for key length"
1335 .to_string(),
1336 });
1337 }
1338
1339 let key_len = u32::from_le_bytes(record_data[0..4].try_into().unwrap()) as usize;
1340
1341 if record_data.len() < 4 + key_len + 1 {
1342 return Err(NativeBackendError::CorruptStringTable {
1343 reason:
1344 "KvDelete deserialization error - insufficient data for key and type"
1345 .to_string(),
1346 });
1347 }
1348
1349 let key = record_data[4..4 + key_len].to_vec();
1350 let offset = 4 + key_len;
1351
1352 let old_value_type = record_data[offset];
1353 let offset = offset + 1;
1354
1355 if record_data.len() < offset + 1 {
1356 return Err(NativeBackendError::CorruptStringTable {
1357 reason:
1358 "KvDelete deserialization error - insufficient data for old value flag"
1359 .to_string(),
1360 });
1361 }
1362
1363 let old_value_flag = record_data[offset];
1364 let old_value_bytes = if old_value_flag == 1 {
1365 if record_data.len() < offset + 1 + 4 {
1366 return Err(NativeBackendError::CorruptStringTable {
1367 reason: "KvDelete deserialization error - insufficient data for old value length".to_string(),
1368 });
1369 }
1370 let old_value_len = u32::from_le_bytes(
1371 record_data[offset + 1..offset + 1 + 4].try_into().unwrap(),
1372 ) as usize;
1373
1374 if record_data.len() < offset + 1 + 4 + old_value_len {
1375 return Err(NativeBackendError::CorruptStringTable {
1376 reason:
1377 "KvDelete deserialization error - insufficient data for old value"
1378 .to_string(),
1379 });
1380 }
1381
1382 let value =
1383 record_data[offset + 1 + 4..offset + 1 + 4 + old_value_len].to_vec();
1384 Some(value)
1385 } else {
1386 None
1387 };
1388 let offset = offset + 1 + old_value_bytes.as_ref().map_or(0, |v| 4 + v.len());
1389
1390 if record_data.len() < offset + 8 {
1391 return Err(NativeBackendError::CorruptStringTable {
1392 reason:
1393 "KvDelete deserialization error - insufficient data for old version"
1394 .to_string(),
1395 });
1396 }
1397
1398 let old_version =
1399 u64::from_le_bytes(record_data[offset..offset + 8].try_into().unwrap());
1400
1401 Ok(V2WALRecord::KvDelete {
1402 key,
1403 old_value_bytes,
1404 old_value_type,
1405 old_version,
1406 })
1407 }
1408
1409 _ => Err(NativeBackendError::CorruptStringTable {
1411 reason: format!(
1412 "WAL deserialization error - unsupported record type: {:?}",
1413 record_type
1414 ),
1415 }),
1416 }
1417 }
1418}
1419
1420#[cfg(test)]
1421mod tests {
1422 use super::*;
1423
1424 #[test]
1425 fn test_record_type_properties() {
1426 assert!(V2WALRecordType::NodeInsert.requires_checkpoint());
1427 assert!(V2WALRecordType::TransactionBegin.is_transaction_control());
1428 assert!(!V2WALRecordType::TransactionBegin.requires_checkpoint());
1429 }
1430
1431 #[test]
1432 fn test_v2_wal_record_cluster_key() {
1433 let record = V2WALRecord::NodeInsert {
1434 node_id: 42,
1435 slot_offset: 1024,
1436 node_data: vec![1, 2, 3],
1437 };
1438 assert_eq!(record.cluster_key(), Some(42));
1439
1440 let record = V2WALRecord::TransactionBegin {
1441 tx_id: 100,
1442 timestamp: 123456,
1443 };
1444 assert_eq!(record.cluster_key(), None);
1445 }
1446
1447 #[test]
1448 fn test_record_serialization_roundtrip() {
1449 let original = V2WALRecord::NodeInsert {
1450 node_id: 123,
1451 slot_offset: 4096,
1452 node_data: vec![4, 5, 6, 7, 8],
1453 };
1454
1455 let serialized = V2WALSerializer::serialize(&original).unwrap();
1456 let deserialized = V2WALSerializer::deserialize(&serialized).unwrap();
1457
1458 match (original, deserialized) {
1459 (
1460 V2WALRecord::NodeInsert {
1461 node_id: id1,
1462 slot_offset: off1,
1463 node_data: data1,
1464 },
1465 V2WALRecord::NodeInsert {
1466 node_id: id2,
1467 slot_offset: off2,
1468 node_data: data2,
1469 },
1470 ) => {
1471 assert_eq!(id1, id2);
1472 assert_eq!(off1, off2);
1473 assert_eq!(data1, data2);
1474 }
1475 _ => panic!("Record type mismatch after roundtrip"),
1476 }
1477 }
1478
1479 #[test]
1480 fn test_serialized_size_estimation() {
1481 let record = V2WALRecord::NodeInsert {
1482 node_id: 42,
1483 slot_offset: 1024,
1484 node_data: vec![1, 2, 3, 4, 5],
1485 };
1486
1487 let estimated = record.serialized_size();
1488 let serialized = V2WALSerializer::serialize(&record).unwrap();
1489
1490 assert!(estimated >= serialized.len());
1492 }
1493
1494 #[test]
1495 fn test_kv_set_serialization_roundtrip() {
1496 let original = V2WALRecord::KvSet {
1497 key: b"test_key".to_vec(),
1498 value_bytes: b"test_value".to_vec(),
1499 value_type: 1,
1500 ttl_seconds: Some(3600),
1501 version: 12345,
1502 };
1503
1504 let serialized = V2WALSerializer::serialize(&original).unwrap();
1505 let deserialized = V2WALSerializer::deserialize(&serialized).unwrap();
1506
1507 match (original, deserialized) {
1508 (
1509 V2WALRecord::KvSet {
1510 key: k1,
1511 value_bytes: v1,
1512 value_type: t1,
1513 ttl_seconds: ttl1,
1514 version: ver1,
1515 },
1516 V2WALRecord::KvSet {
1517 key: k2,
1518 value_bytes: v2,
1519 value_type: t2,
1520 ttl_seconds: ttl2,
1521 version: ver2,
1522 },
1523 ) => {
1524 assert_eq!(k1, k2);
1525 assert_eq!(v1, v2);
1526 assert_eq!(t1, t2);
1527 assert_eq!(ttl1, ttl2);
1528 assert_eq!(ver1, ver2);
1529 }
1530 _ => panic!("Record type mismatch after KV set roundtrip"),
1531 }
1532 }
1533
1534 #[test]
1535 fn test_kv_delete_serialization_roundtrip() {
1536 let original = V2WALRecord::KvDelete {
1537 key: b"test_key".to_vec(),
1538 old_value_bytes: Some(b"old_value".to_vec()),
1539 old_value_type: 1,
1540 old_version: 12344,
1541 };
1542
1543 let serialized = V2WALSerializer::serialize(&original).unwrap();
1544 let deserialized = V2WALSerializer::deserialize(&serialized).unwrap();
1545
1546 match (original, deserialized) {
1547 (
1548 V2WALRecord::KvDelete {
1549 key: k1,
1550 old_value_bytes: v1,
1551 old_value_type: t1,
1552 old_version: ver1,
1553 },
1554 V2WALRecord::KvDelete {
1555 key: k2,
1556 old_value_bytes: v2,
1557 old_value_type: t2,
1558 old_version: ver2,
1559 },
1560 ) => {
1561 assert_eq!(k1, k2);
1562 assert_eq!(v1, v2);
1563 assert_eq!(t1, t2);
1564 assert_eq!(ver1, ver2);
1565 }
1566 _ => panic!("Record type mismatch after KV delete roundtrip"),
1567 }
1568 }
1569
1570 #[test]
1571 fn test_kv_set_without_ttl() {
1572 let original = V2WALRecord::KvSet {
1573 key: b"no_ttl_key".to_vec(),
1574 value_bytes: b"value".to_vec(),
1575 value_type: 0,
1576 ttl_seconds: None,
1577 version: 1,
1578 };
1579
1580 let serialized = V2WALSerializer::serialize(&original).unwrap();
1581 let deserialized = V2WALSerializer::deserialize(&serialized).unwrap();
1582
1583 match deserialized {
1584 V2WALRecord::KvSet { ttl_seconds, .. } => {
1585 assert_eq!(ttl_seconds, None);
1586 }
1587 _ => panic!("Wrong record type"),
1588 }
1589 }
1590
1591 #[test]
1592 fn test_kv_delete_no_old_value() {
1593 let original = V2WALRecord::KvDelete {
1594 key: b"no_old_value".to_vec(),
1595 old_value_bytes: None,
1596 old_value_type: 0,
1597 old_version: 0,
1598 };
1599
1600 let serialized = V2WALSerializer::serialize(&original).unwrap();
1601 let deserialized = V2WALSerializer::deserialize(&serialized).unwrap();
1602
1603 match deserialized {
1604 V2WALRecord::KvDelete {
1605 old_value_bytes, ..
1606 } => {
1607 assert_eq!(old_value_bytes, None);
1608 }
1609 _ => panic!("Wrong record type"),
1610 }
1611 }
1612}