use crate::backend::native::v2::edge_cluster::{CompactEdgeRecord, Direction};
use crate::backend::native::{NativeBackendError, NativeResult};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[repr(u8)]
pub enum V2WALRecordType {
NodeInsert = 1,
NodeUpdate = 2,
NodeDelete = 3,
ClusterCreate = 4,
EdgeInsert = 5,
EdgeUpdate = 6,
EdgeDelete = 7,
StringInsert = 8,
FreeSpaceAllocate = 9,
FreeSpaceDeallocate = 10,
TransactionBegin = 11,
TransactionCommit = 12,
TransactionRollback = 13,
Checkpoint = 14,
HeaderUpdate = 15,
SegmentEnd = 16,
TransactionPrepare = 17,
TransactionAbort = 18,
SavepointCreate = 19,
SavepointRollback = 20,
SavepointRelease = 21,
BackupCreate = 22,
BackupRestore = 23,
LockAcquire = 24,
LockRelease = 25,
IndexUpdate = 26,
StatisticsUpdate = 27,
AllocateContiguous = 28,
CommitContiguous = 29,
RollbackContiguous = 30,
KvSet = 31,
KvDelete = 32,
}
impl V2WALRecordType {
pub fn data_modifying() -> &'static [V2WALRecordType] {
&[
Self::NodeInsert,
Self::NodeUpdate,
Self::NodeDelete,
Self::ClusterCreate,
Self::EdgeInsert,
Self::EdgeUpdate,
Self::EdgeDelete,
Self::StringInsert,
Self::FreeSpaceAllocate,
Self::FreeSpaceDeallocate,
Self::HeaderUpdate,
Self::AllocateContiguous,
Self::CommitContiguous,
Self::RollbackContiguous,
Self::KvSet,
Self::KvDelete,
]
}
pub fn transaction_control() -> &'static [V2WALRecordType] {
&[
Self::TransactionBegin,
Self::TransactionCommit,
Self::TransactionRollback,
]
}
pub fn requires_checkpoint(&self) -> bool {
Self::data_modifying().contains(self)
}
pub fn is_transaction_control(&self) -> bool {
Self::transaction_control().contains(self)
}
}
impl TryFrom<u8> for V2WALRecordType {
type Error = NativeBackendError;
fn try_from(value: u8) -> NativeResult<Self> {
match value {
1 => Ok(Self::NodeInsert),
2 => Ok(Self::NodeUpdate),
3 => Ok(Self::NodeDelete),
4 => Ok(Self::ClusterCreate),
5 => Ok(Self::EdgeInsert),
6 => Ok(Self::EdgeUpdate),
7 => Ok(Self::EdgeDelete),
8 => Ok(Self::StringInsert),
9 => Ok(Self::FreeSpaceAllocate),
10 => Ok(Self::FreeSpaceDeallocate),
11 => Ok(Self::TransactionBegin),
12 => Ok(Self::TransactionCommit),
13 => Ok(Self::TransactionRollback),
14 => Ok(Self::Checkpoint),
15 => Ok(Self::HeaderUpdate),
16 => Ok(Self::SegmentEnd),
17 => Ok(Self::TransactionPrepare),
18 => Ok(Self::TransactionAbort),
19 => Ok(Self::SavepointCreate),
20 => Ok(Self::SavepointRollback),
21 => Ok(Self::SavepointRelease),
22 => Ok(Self::BackupCreate),
23 => Ok(Self::BackupRestore),
24 => Ok(Self::LockAcquire),
25 => Ok(Self::LockRelease),
26 => Ok(Self::IndexUpdate),
27 => Ok(Self::StatisticsUpdate),
28 => Ok(Self::AllocateContiguous),
29 => Ok(Self::CommitContiguous),
30 => Ok(Self::RollbackContiguous),
31 => Ok(Self::KvSet),
32 => Ok(Self::KvDelete),
_ => Err(NativeBackendError::CorruptStringTable {
reason: format!("unknown WAL record type: {}", value),
}),
}
}
}
#[derive(Debug, Clone)]
pub enum V2WALRecord {
NodeInsert {
node_id: i64,
slot_offset: u64,
node_data: Vec<u8>,
},
NodeUpdate {
node_id: i64,
slot_offset: u64,
old_data: Vec<u8>,
new_data: Vec<u8>,
},
NodeDelete {
node_id: i64,
slot_offset: u64,
old_data: Vec<u8>,
outgoing_edges: Vec<CompactEdgeRecord>,
incoming_edges: Vec<CompactEdgeRecord>,
},
ClusterCreate {
node_id: i64,
direction: Direction,
cluster_offset: u64,
cluster_size: u32,
edge_data: Vec<u8>,
},
EdgeInsert {
cluster_key: (i64, Direction), edge_record: CompactEdgeRecord,
insertion_point: u32,
},
EdgeUpdate {
cluster_key: (i64, Direction),
old_edge: CompactEdgeRecord,
new_edge: CompactEdgeRecord,
position: u32,
},
EdgeDelete {
cluster_key: (i64, Direction),
old_edge: CompactEdgeRecord,
position: u32,
},
StringInsert {
string_id: u32,
string_value: String,
},
FreeSpaceAllocate {
block_offset: u64,
block_size: u32,
block_type: u8,
},
FreeSpaceDeallocate {
block_offset: u64,
block_size: u32,
block_type: u8,
},
TransactionBegin { tx_id: u64, timestamp: u64 },
TransactionCommit { tx_id: u64, timestamp: u64 },
TransactionRollback { tx_id: u64, timestamp: u64 },
Checkpoint {
checkpointed_lsn: u64,
timestamp: u64,
},
HeaderUpdate {
header_offset: u64,
old_data: Vec<u8>,
new_data: Vec<u8>,
},
SegmentEnd { segment_lsn: u64, checksum: u32 },
TransactionPrepare {
tx_id: u64,
record_count: u64,
timestamp: std::time::SystemTime,
},
TransactionAbort {
tx_id: u64,
abort_reason: String,
timestamp: std::time::SystemTime,
},
SavepointCreate {
tx_id: u64,
savepoint_id: String,
timestamp: std::time::SystemTime,
},
SavepointRollback {
tx_id: u64,
savepoint_id: String,
timestamp: std::time::SystemTime,
},
SavepointRelease {
tx_id: u64,
savepoint_id: String,
timestamp: std::time::SystemTime,
},
BackupCreate {
backup_id: String,
backup_path: std::path::PathBuf,
timestamp: std::time::SystemTime,
},
BackupRestore {
backup_id: String,
backup_path: std::path::PathBuf,
target_path: std::path::PathBuf,
timestamp: std::time::SystemTime,
},
LockAcquire {
tx_id: u64,
resource_id: i64,
lock_type: u8,
timestamp: std::time::SystemTime,
},
LockRelease {
tx_id: u64,
resource_id: i64,
timestamp: std::time::SystemTime,
},
IndexUpdate {
index_id: u32,
operation_type: u8,
key_data: Vec<u8>,
timestamp: std::time::SystemTime,
},
StatisticsUpdate {
stats_type: u8,
stats_data: Vec<u8>,
timestamp: std::time::SystemTime,
},
AllocateContiguous {
txn_id: u64,
region: ContiguousRegion,
timestamp: u64,
},
CommitContiguous {
txn_id: u64,
region: ContiguousRegion,
},
RollbackContiguous { region: ContiguousRegion },
KvSet {
key: Vec<u8>,
value_bytes: Vec<u8>,
value_type: u8,
ttl_seconds: Option<u64>,
version: u64,
},
KvDelete {
key: Vec<u8>,
old_value_bytes: Option<Vec<u8>>,
old_value_type: u8,
old_version: u64,
},
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ContiguousRegion {
pub start_offset: u64,
pub total_size: u64,
pub cluster_count: u32,
pub stride: u32,
}
impl ContiguousRegion {
pub fn new(start_offset: u64, total_size: u64) -> Self {
Self {
start_offset,
total_size,
cluster_count: 0,
stride: 0,
}
}
pub fn with_clusters(mut self, cluster_count: u32, stride: u32) -> Self {
self.cluster_count = cluster_count;
self.stride = stride;
self
}
pub fn end_offset(&self) -> u64 {
self.start_offset + self.total_size
}
pub fn overlaps(&self, other: &ContiguousRegion) -> bool {
self.start_offset < other.end_offset() && other.start_offset < self.end_offset()
}
pub fn serialize(&self) -> Vec<u8> {
let mut buffer = Vec::with_capacity(24); buffer.extend_from_slice(&self.start_offset.to_le_bytes());
buffer.extend_from_slice(&self.total_size.to_le_bytes());
buffer.extend_from_slice(&self.cluster_count.to_le_bytes());
buffer.extend_from_slice(&self.stride.to_le_bytes());
buffer
}
pub fn deserialize(data: &[u8]) -> Result<Self, String> {
if data.len() < 24 {
return Err(format!(
"Insufficient data for ContiguousRegion: expected 24, got {}",
data.len()
));
}
let start_offset = u64::from_le_bytes(data[0..8].try_into().unwrap());
let total_size = u64::from_le_bytes(data[8..16].try_into().unwrap());
let cluster_count = u32::from_le_bytes(data[16..20].try_into().unwrap());
let stride = u32::from_le_bytes(data[20..24].try_into().unwrap());
Ok(Self {
start_offset,
total_size,
cluster_count,
stride,
})
}
}
impl V2WALRecord {
pub fn record_type(&self) -> V2WALRecordType {
match self {
Self::NodeInsert { .. } => V2WALRecordType::NodeInsert,
Self::NodeUpdate { .. } => V2WALRecordType::NodeUpdate,
Self::NodeDelete { .. } => V2WALRecordType::NodeDelete,
Self::ClusterCreate { .. } => V2WALRecordType::ClusterCreate,
Self::EdgeInsert { .. } => V2WALRecordType::EdgeInsert,
Self::EdgeUpdate { .. } => V2WALRecordType::EdgeUpdate,
Self::EdgeDelete { .. } => V2WALRecordType::EdgeDelete,
Self::StringInsert { .. } => V2WALRecordType::StringInsert,
Self::FreeSpaceAllocate { .. } => V2WALRecordType::FreeSpaceAllocate,
Self::FreeSpaceDeallocate { .. } => V2WALRecordType::FreeSpaceDeallocate,
Self::TransactionBegin { .. } => V2WALRecordType::TransactionBegin,
Self::TransactionCommit { .. } => V2WALRecordType::TransactionCommit,
Self::TransactionRollback { .. } => V2WALRecordType::TransactionRollback,
Self::Checkpoint { .. } => V2WALRecordType::Checkpoint,
Self::HeaderUpdate { .. } => V2WALRecordType::HeaderUpdate,
Self::SegmentEnd { .. } => V2WALRecordType::SegmentEnd,
Self::TransactionPrepare { .. } => V2WALRecordType::TransactionPrepare,
Self::TransactionAbort { .. } => V2WALRecordType::TransactionAbort,
Self::SavepointCreate { .. } => V2WALRecordType::SavepointCreate,
Self::SavepointRollback { .. } => V2WALRecordType::SavepointRollback,
Self::SavepointRelease { .. } => V2WALRecordType::SavepointRelease,
Self::BackupCreate { .. } => V2WALRecordType::BackupCreate,
Self::BackupRestore { .. } => V2WALRecordType::BackupRestore,
Self::LockAcquire { .. } => V2WALRecordType::LockAcquire,
Self::LockRelease { .. } => V2WALRecordType::LockRelease,
Self::IndexUpdate { .. } => V2WALRecordType::IndexUpdate,
Self::StatisticsUpdate { .. } => V2WALRecordType::StatisticsUpdate,
Self::AllocateContiguous { .. } => V2WALRecordType::AllocateContiguous,
Self::CommitContiguous { .. } => V2WALRecordType::CommitContiguous,
Self::RollbackContiguous { .. } => V2WALRecordType::RollbackContiguous,
Self::KvSet { .. } => V2WALRecordType::KvSet,
Self::KvDelete { .. } => V2WALRecordType::KvDelete,
}
}
pub fn cluster_key(&self) -> Option<i64> {
match self {
Self::NodeInsert { node_id, .. } => Some(*node_id),
Self::NodeUpdate { node_id, .. } => Some(*node_id),
Self::NodeDelete { node_id, .. } => Some(*node_id),
Self::ClusterCreate { node_id, .. } => Some(*node_id),
Self::EdgeInsert {
cluster_key: (node_id, _),
..
} => Some(*node_id),
Self::EdgeUpdate {
cluster_key: (node_id, _),
..
} => Some(*node_id),
Self::EdgeDelete {
cluster_key: (node_id, _),
..
} => Some(*node_id),
_ => None,
}
}
pub fn serialized_size(&self) -> usize {
let base_size = std::mem::size_of::<V2WALRecordType>() + std::mem::size_of::<u32>();
match self {
Self::NodeInsert { node_data, .. } => base_size + 8 + 8 + 4 + node_data.len(),
Self::NodeUpdate {
old_data, new_data, ..
} => base_size + 8 + 8 + 4 + old_data.len() + 4 + new_data.len(),
Self::NodeDelete {
old_data,
outgoing_edges,
incoming_edges,
..
} => {
let outgoing_size: usize = outgoing_edges.iter().map(|e| e.serialized_size()).sum();
let incoming_size: usize = incoming_edges.iter().map(|e| e.serialized_size()).sum();
base_size + 8 + 8 + 4 + old_data.len() + 4 + outgoing_size + 4 + incoming_size
}
Self::ClusterCreate { edge_data, .. } => base_size + 8 + 1 + 8 + 4 + edge_data.len(),
Self::EdgeInsert { edge_record, .. } => {
base_size + 8 + 1 + edge_record.serialized_size() + 4
}
Self::EdgeUpdate {
old_edge, new_edge, ..
} => base_size + 8 + 1 + old_edge.serialized_size() + new_edge.serialized_size() + 4,
Self::EdgeDelete { old_edge, .. } => base_size + 8 + 1 + old_edge.serialized_size() + 4,
Self::StringInsert { string_value, .. } => base_size + 4 + string_value.len(),
Self::FreeSpaceAllocate { .. } | Self::FreeSpaceDeallocate { .. } => {
base_size + 8 + 4 + 1
}
Self::TransactionBegin { .. }
| Self::TransactionCommit { .. }
| Self::TransactionRollback { .. } => base_size + 8 + 8,
Self::Checkpoint { .. } => base_size + 8 + 8,
Self::HeaderUpdate {
old_data, new_data, ..
} => base_size + 8 + old_data.len() + new_data.len(),
Self::SegmentEnd { .. } => base_size + 8 + 4,
Self::TransactionPrepare {
record_count: _, ..
} => base_size + 8 + 8 + 8,
Self::TransactionAbort { abort_reason, .. } => base_size + 8 + abort_reason.len(),
Self::SavepointCreate { savepoint_id, .. } => base_size + 8 + savepoint_id.len(),
Self::SavepointRollback { savepoint_id, .. } => base_size + 8 + savepoint_id.len(),
Self::SavepointRelease { savepoint_id, .. } => base_size + 8 + savepoint_id.len(),
Self::BackupCreate {
backup_id,
backup_path,
..
} => base_size + backup_id.len() + backup_path.to_string_lossy().len(),
Self::BackupRestore {
backup_id,
backup_path,
target_path,
..
} => {
base_size
+ backup_id.len()
+ backup_path.to_string_lossy().len()
+ target_path.to_string_lossy().len()
}
Self::LockAcquire { .. } | Self::LockRelease { .. } => base_size + 8 + 8 + 1,
Self::IndexUpdate { .. } | Self::StatisticsUpdate { .. } => base_size,
Self::AllocateContiguous { .. } => base_size + 8 + 24 + 8, Self::CommitContiguous { .. } => base_size + 8 + 24, Self::RollbackContiguous { .. } => base_size + 24, Self::KvSet {
key,
value_bytes,
ttl_seconds,
..
} => {
base_size
+ 4
+ key.len()
+ 4
+ value_bytes.len()
+ 1
+ 1
+ (if ttl_seconds.is_some() { 8 } else { 0 })
+ 8
}
Self::KvDelete {
key,
old_value_bytes,
..
} => {
base_size
+ 4
+ key.len()
+ 1
+ old_value_bytes.as_ref().map(|v| 4 + v.len()).unwrap_or(0)
+ 8
}
}
}
pub fn modifies_data(&self) -> bool {
self.record_type().requires_checkpoint()
}
pub fn is_transaction_control(&self) -> bool {
self.record_type().is_transaction_control()
}
}
#[derive(Debug, Clone)]
pub enum WALSerializationError {
InvalidRecordType(u8),
InsufficientData {
expected: usize,
actual: usize,
record_type: V2WALRecordType,
},
CorruptedData { location: String, details: String },
IoError(String),
SizeOverflow,
}
impl std::fmt::Display for WALSerializationError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::InvalidRecordType(t) => write!(f, "Invalid WAL record type: {}", t),
Self::InsufficientData {
expected,
actual,
record_type,
} => {
write!(
f,
"Insufficient data for {:?}: expected {}, got {}",
record_type, expected, actual
)
}
Self::CorruptedData { location, details } => {
write!(f, "Corrupted WAL data at {}: {}", location, details)
}
Self::IoError(msg) => write!(f, "I/O error during WAL serialization: {}", msg),
Self::SizeOverflow => write!(f, "Size overflow in WAL record data"),
}
}
}
impl std::error::Error for WALSerializationError {}
pub struct V2WALSerializer;
impl V2WALSerializer {
pub fn serialize(record: &V2WALRecord) -> NativeResult<Vec<u8>> {
let mut buffer = Vec::with_capacity(record.serialized_size());
buffer.push(record.record_type() as u8);
let size_pos = buffer.len();
buffer.extend_from_slice(&[0u8; 4]);
let data_start = buffer.len();
match record {
V2WALRecord::NodeInsert {
node_id,
slot_offset,
node_data,
} => {
buffer.extend_from_slice(&node_id.to_le_bytes());
buffer.extend_from_slice(&slot_offset.to_le_bytes());
buffer.extend_from_slice(&(node_data.len() as u32).to_le_bytes());
buffer.extend_from_slice(node_data);
}
V2WALRecord::NodeUpdate {
node_id,
slot_offset,
old_data,
new_data,
} => {
buffer.extend_from_slice(&node_id.to_le_bytes());
buffer.extend_from_slice(&slot_offset.to_le_bytes());
buffer.extend_from_slice(&(old_data.len() as u32).to_le_bytes());
buffer.extend_from_slice(old_data);
buffer.extend_from_slice(&(new_data.len() as u32).to_le_bytes());
buffer.extend_from_slice(new_data);
}
V2WALRecord::NodeDelete {
node_id,
slot_offset,
old_data,
outgoing_edges,
incoming_edges,
} => {
buffer.extend_from_slice(&node_id.to_le_bytes());
buffer.extend_from_slice(&slot_offset.to_le_bytes());
buffer.extend_from_slice(&(old_data.len() as u32).to_le_bytes());
buffer.extend_from_slice(old_data);
buffer.extend_from_slice(&(outgoing_edges.len() as u32).to_le_bytes());
for edge in outgoing_edges {
buffer.extend_from_slice(&edge.serialize());
}
buffer.extend_from_slice(&(incoming_edges.len() as u32).to_le_bytes());
for edge in incoming_edges {
buffer.extend_from_slice(&edge.serialize());
}
}
V2WALRecord::ClusterCreate {
node_id,
direction,
cluster_offset,
cluster_size,
edge_data,
} => {
buffer.extend_from_slice(&node_id.to_le_bytes());
buffer.push(*direction as u8);
buffer.extend_from_slice(&cluster_offset.to_le_bytes());
buffer.extend_from_slice(&cluster_size.to_le_bytes());
buffer.extend_from_slice(&(edge_data.len() as u32).to_le_bytes());
buffer.extend_from_slice(edge_data);
}
V2WALRecord::EdgeInsert {
cluster_key,
edge_record,
insertion_point,
} => {
buffer.extend_from_slice(&cluster_key.0.to_le_bytes());
buffer.push(cluster_key.1 as u8);
buffer.extend_from_slice(&edge_record.as_bytes());
buffer.extend_from_slice(&insertion_point.to_le_bytes());
}
V2WALRecord::TransactionBegin { tx_id, timestamp } => {
buffer.extend_from_slice(&tx_id.to_le_bytes());
buffer.extend_from_slice(×tamp.to_le_bytes());
}
V2WALRecord::TransactionCommit { tx_id, timestamp } => {
buffer.extend_from_slice(&tx_id.to_le_bytes());
buffer.extend_from_slice(×tamp.to_le_bytes());
}
V2WALRecord::TransactionRollback { tx_id, timestamp } => {
buffer.extend_from_slice(&tx_id.to_le_bytes());
buffer.extend_from_slice(×tamp.to_le_bytes());
}
V2WALRecord::AllocateContiguous {
txn_id,
region,
timestamp,
} => {
buffer.extend_from_slice(&txn_id.to_le_bytes());
let region_bytes = region.serialize();
buffer.extend_from_slice(&(region_bytes.len() as u32).to_le_bytes());
buffer.extend_from_slice(®ion_bytes);
buffer.extend_from_slice(×tamp.to_le_bytes());
}
V2WALRecord::CommitContiguous { txn_id, region } => {
buffer.extend_from_slice(&txn_id.to_le_bytes());
let region_bytes = region.serialize();
buffer.extend_from_slice(&(region_bytes.len() as u32).to_le_bytes());
buffer.extend_from_slice(®ion_bytes);
}
V2WALRecord::RollbackContiguous { region } => {
let region_bytes = region.serialize();
buffer.extend_from_slice(&(region_bytes.len() as u32).to_le_bytes());
buffer.extend_from_slice(®ion_bytes);
}
V2WALRecord::KvSet {
key,
value_bytes,
value_type,
ttl_seconds,
version,
} => {
buffer.extend_from_slice(&(key.len() as u32).to_le_bytes());
buffer.extend_from_slice(key);
buffer.extend_from_slice(&(value_bytes.len() as u32).to_le_bytes());
buffer.extend_from_slice(value_bytes);
buffer.push(*value_type);
match ttl_seconds {
Some(ttl) => {
buffer.push(1); buffer.extend_from_slice(&ttl.to_le_bytes());
}
None => {
buffer.push(0); }
}
buffer.extend_from_slice(&version.to_le_bytes());
}
V2WALRecord::KvDelete {
key,
old_value_bytes,
old_value_type,
old_version,
} => {
buffer.extend_from_slice(&(key.len() as u32).to_le_bytes());
buffer.extend_from_slice(key);
buffer.push(*old_value_type);
match old_value_bytes {
Some(value) => {
buffer.push(1); buffer.extend_from_slice(&(value.len() as u32).to_le_bytes());
buffer.extend_from_slice(value);
}
None => {
buffer.push(0); }
}
buffer.extend_from_slice(&old_version.to_le_bytes());
}
_ => {
return Err(NativeBackendError::CorruptStringTable {
reason: format!(
"WAL serialization error - unsupported record type: {:?}",
record.record_type()
),
});
}
}
let record_size = buffer.len() - data_start;
let size_bytes = (record_size as u32).to_le_bytes();
buffer[size_pos..size_pos + 4].copy_from_slice(&size_bytes);
Ok(buffer)
}
pub fn deserialize(data: &[u8]) -> NativeResult<V2WALRecord> {
if data.is_empty() {
return Err(NativeBackendError::CorruptStringTable {
reason: "WAL deserialization error - empty data buffer".to_string(),
});
}
let record_type = V2WALRecordType::try_from(data[0])?;
if data.len() < 5 {
return Err(NativeBackendError::CorruptStringTable {
reason: "WAL deserialization error - insufficient data for record size".to_string(),
});
}
let record_size = u32::from_le_bytes([data[1], data[2], data[3], data[4]]) as usize;
if data.len() < 5 + record_size {
return Err(NativeBackendError::CorruptStringTable {
reason: format!(
"WAL deserialization error - insufficient data: expected {}, got {}",
record_size + 5,
data.len()
),
});
}
let record_data = &data[5..5 + record_size];
match record_type {
V2WALRecordType::NodeInsert => {
if record_data.len() < 16 {
return Err(NativeBackendError::CorruptStringTable {
reason: "NodeInsert deserialization error - insufficient data for header"
.to_string(),
});
}
let node_id = i64::from_le_bytes(record_data[0..8].try_into().unwrap());
let slot_offset = u64::from_le_bytes(record_data[8..16].try_into().unwrap());
if record_data.len() < 20 {
return Err(NativeBackendError::CorruptStringTable {
reason:
"NodeInsert deserialization error - insufficient data for size field"
.to_string(),
});
}
let data_len = u32::from_le_bytes(record_data[16..20].try_into().unwrap()) as usize;
if record_data.len() < 20 + data_len {
return Err(NativeBackendError::CorruptStringTable {
reason:
"NodeInsert deserialization error - insufficient data for node data"
.to_string(),
});
}
let node_data = record_data[20..20 + data_len].to_vec();
Ok(V2WALRecord::NodeInsert {
node_id,
slot_offset,
node_data,
})
}
V2WALRecordType::NodeDelete => {
if record_data.len() < 16 {
return Err(NativeBackendError::CorruptStringTable {
reason: "NodeDelete deserialization error - insufficient data for header"
.to_string(),
});
}
let node_id = i64::from_le_bytes(record_data[0..8].try_into().unwrap());
let slot_offset = u64::from_le_bytes(record_data[8..16].try_into().unwrap());
if record_data.len() < 20 {
return Err(NativeBackendError::CorruptStringTable {
reason:
"NodeDelete deserialization error - insufficient data for size field"
.to_string(),
});
}
let data_len = u32::from_le_bytes(record_data[16..20].try_into().unwrap()) as usize;
if record_data.len() < 20 + data_len {
return Err(NativeBackendError::CorruptStringTable {
reason:
"NodeDelete deserialization error - insufficient data for node data"
.to_string(),
});
}
let old_data = record_data[20..20 + data_len].to_vec();
let mut offset = 20 + data_len;
if record_data.len() < offset + 4 {
return Err(NativeBackendError::CorruptStringTable {
reason: "NodeDelete deserialization error - insufficient data for outgoing edge count".to_string(),
});
}
let outgoing_count =
u32::from_le_bytes(record_data[offset..offset + 4].try_into().unwrap())
as usize;
offset += 4;
let mut outgoing_edges = Vec::with_capacity(outgoing_count);
for _ in 0..outgoing_count {
if record_data.len() < offset + 12 {
return Err(NativeBackendError::CorruptStringTable {
reason: "NodeDelete deserialization error - insufficient data for outgoing edge header".to_string(),
});
}
let edge_data_len = u16::from_be_bytes(
record_data[offset + 10..offset + 12].try_into().unwrap(),
) as usize;
let edge_total_len = 12 + edge_data_len;
if record_data.len() < offset + edge_total_len {
return Err(NativeBackendError::CorruptStringTable {
reason: "NodeDelete deserialization error - insufficient data for outgoing edge".to_string(),
});
}
let edge_bytes = &record_data[offset..offset + edge_total_len];
match CompactEdgeRecord::deserialize(edge_bytes) {
Ok(edge) => outgoing_edges.push(edge),
Err(e) => {
return Err(NativeBackendError::CorruptStringTable {
reason: format!(
"NodeDelete deserialization error - failed to deserialize outgoing edge: {:?}",
e
),
});
}
}
offset += edge_total_len;
}
if record_data.len() < offset + 4 {
return Err(NativeBackendError::CorruptStringTable {
reason: "NodeDelete deserialization error - insufficient data for incoming edge count".to_string(),
});
}
let incoming_count =
u32::from_le_bytes(record_data[offset..offset + 4].try_into().unwrap())
as usize;
offset += 4;
let mut incoming_edges = Vec::with_capacity(incoming_count);
for _ in 0..incoming_count {
if record_data.len() < offset + 12 {
return Err(NativeBackendError::CorruptStringTable {
reason: "NodeDelete deserialization error - insufficient data for incoming edge header".to_string(),
});
}
let edge_data_len = u16::from_be_bytes(
record_data[offset + 10..offset + 12].try_into().unwrap(),
) as usize;
let edge_total_len = 12 + edge_data_len;
if record_data.len() < offset + edge_total_len {
return Err(NativeBackendError::CorruptStringTable {
reason: "NodeDelete deserialization error - insufficient data for incoming edge".to_string(),
});
}
let edge_bytes = &record_data[offset..offset + edge_total_len];
match CompactEdgeRecord::deserialize(edge_bytes) {
Ok(edge) => incoming_edges.push(edge),
Err(e) => {
return Err(NativeBackendError::CorruptStringTable {
reason: format!(
"NodeDelete deserialization error - failed to deserialize incoming edge: {:?}",
e
),
});
}
}
offset += edge_total_len;
}
Ok(V2WALRecord::NodeDelete {
node_id,
slot_offset,
old_data,
outgoing_edges,
incoming_edges,
})
}
V2WALRecordType::TransactionBegin => {
if record_data.len() < 16 {
return Err(NativeBackendError::CorruptStringTable {
reason: "TransactionBegin deserialization error - insufficient data"
.to_string(),
});
}
let tx_id = u64::from_le_bytes(record_data[0..8].try_into().unwrap());
let timestamp = u64::from_le_bytes(record_data[8..16].try_into().unwrap());
Ok(V2WALRecord::TransactionBegin { tx_id, timestamp })
}
V2WALRecordType::TransactionCommit => {
if record_data.len() < 16 {
return Err(NativeBackendError::CorruptStringTable {
reason: "TransactionCommit deserialization error - insufficient data"
.to_string(),
});
}
let tx_id = u64::from_le_bytes(record_data[0..8].try_into().unwrap());
let timestamp = u64::from_le_bytes(record_data[8..16].try_into().unwrap());
Ok(V2WALRecord::TransactionCommit { tx_id, timestamp })
}
V2WALRecordType::TransactionRollback => {
if record_data.len() < 16 {
return Err(NativeBackendError::CorruptStringTable {
reason: "TransactionRollback deserialization error - insufficient data"
.to_string(),
});
}
let tx_id = u64::from_le_bytes(record_data[0..8].try_into().unwrap());
let timestamp = u64::from_le_bytes(record_data[8..16].try_into().unwrap());
Ok(V2WALRecord::TransactionRollback { tx_id, timestamp })
}
V2WALRecordType::AllocateContiguous => {
if record_data.len() < 40 {
return Err(NativeBackendError::CorruptStringTable {
reason: format!(
"AllocateContiguous deserialization error - insufficient data: expected 40, got {}",
record_data.len()
),
});
}
let txn_id = u64::from_le_bytes(record_data[0..8].try_into().unwrap());
let region_len =
u32::from_le_bytes(record_data[8..12].try_into().unwrap()) as usize;
let region_bytes = &record_data[12..12 + region_len];
let timestamp = u64::from_le_bytes(
record_data[12 + region_len..20 + region_len]
.try_into()
.unwrap(),
);
let region = ContiguousRegion::deserialize(region_bytes).map_err(|e| {
NativeBackendError::CorruptStringTable {
reason: format!(
"AllocateContiguous deserialization error - invalid region: {}",
e
),
}
})?;
Ok(V2WALRecord::AllocateContiguous {
txn_id,
region,
timestamp,
})
}
V2WALRecordType::CommitContiguous => {
if record_data.len() < 12 {
return Err(NativeBackendError::CorruptStringTable {
reason: "CommitContiguous deserialization error - insufficient data"
.to_string(),
});
}
let txn_id = u64::from_le_bytes(record_data[0..8].try_into().unwrap());
let region_len =
u32::from_le_bytes(record_data[8..12].try_into().unwrap()) as usize;
if record_data.len() < 12 + region_len {
return Err(NativeBackendError::CorruptStringTable {
reason:
"CommitContiguous deserialization error - insufficient data for region"
.to_string(),
});
}
let region_bytes = &record_data[12..12 + region_len];
let region = ContiguousRegion::deserialize(region_bytes).map_err(|e| {
NativeBackendError::CorruptStringTable {
reason: format!(
"CommitContiguous deserialization error - invalid region: {}",
e
),
}
})?;
Ok(V2WALRecord::CommitContiguous { txn_id, region })
}
V2WALRecordType::RollbackContiguous => {
if record_data.len() < 4 {
return Err(NativeBackendError::CorruptStringTable {
reason: "RollbackContiguous deserialization error - insufficient data"
.to_string(),
});
}
let region_len = u32::from_le_bytes(record_data[0..4].try_into().unwrap()) as usize;
if record_data.len() < 4 + region_len {
return Err(NativeBackendError::CorruptStringTable {
reason: "RollbackContiguous deserialization error - insufficient data for region".to_string(),
});
}
let region_bytes = &record_data[4..4 + region_len];
let region = ContiguousRegion::deserialize(region_bytes).map_err(|e| {
NativeBackendError::CorruptStringTable {
reason: format!(
"RollbackContiguous deserialization error - invalid region: {}",
e
),
}
})?;
Ok(V2WALRecord::RollbackContiguous { region })
}
V2WALRecordType::KvSet => {
if record_data.len() < 4 {
return Err(NativeBackendError::CorruptStringTable {
reason: "KvSet deserialization error - insufficient data for key length"
.to_string(),
});
}
let key_len = u32::from_le_bytes(record_data[0..4].try_into().unwrap()) as usize;
if record_data.len() < 4 + key_len + 4 {
return Err(NativeBackendError::CorruptStringTable {
reason: "KvSet deserialization error - insufficient data for key and value length".to_string(),
});
}
let key = record_data[4..4 + key_len].to_vec();
let offset = 4 + key_len;
let value_len =
u32::from_le_bytes(record_data[offset..offset + 4].try_into().unwrap())
as usize;
if record_data.len() < offset + 4 + value_len + 1 + 1 + 8 {
return Err(NativeBackendError::CorruptStringTable {
reason:
"KvSet deserialization error - insufficient data for value and metadata"
.to_string(),
});
}
let value_bytes = record_data[offset + 4..offset + 4 + value_len].to_vec();
let offset = offset + 4 + value_len;
let value_type = record_data[offset];
let offset = offset + 1;
let ttl_flag = record_data[offset];
let ttl_seconds = if ttl_flag == 1 {
if record_data.len() < offset + 1 + 8 {
return Err(NativeBackendError::CorruptStringTable {
reason: "KvSet deserialization error - insufficient data for TTL"
.to_string(),
});
}
let ttl = u64::from_le_bytes(
record_data[offset + 1..offset + 1 + 8].try_into().unwrap(),
);
Some(ttl)
} else {
None
};
let offset = offset + 1 + ttl_seconds.map_or(0, |_| 8);
if record_data.len() < offset + 8 {
return Err(NativeBackendError::CorruptStringTable {
reason: "KvSet deserialization error - insufficient data for version"
.to_string(),
});
}
let version =
u64::from_le_bytes(record_data[offset..offset + 8].try_into().unwrap());
Ok(V2WALRecord::KvSet {
key,
value_bytes,
value_type,
ttl_seconds,
version,
})
}
V2WALRecordType::KvDelete => {
if record_data.len() < 4 {
return Err(NativeBackendError::CorruptStringTable {
reason: "KvDelete deserialization error - insufficient data for key length"
.to_string(),
});
}
let key_len = u32::from_le_bytes(record_data[0..4].try_into().unwrap()) as usize;
if record_data.len() < 4 + key_len + 1 {
return Err(NativeBackendError::CorruptStringTable {
reason:
"KvDelete deserialization error - insufficient data for key and type"
.to_string(),
});
}
let key = record_data[4..4 + key_len].to_vec();
let offset = 4 + key_len;
let old_value_type = record_data[offset];
let offset = offset + 1;
if record_data.len() < offset + 1 {
return Err(NativeBackendError::CorruptStringTable {
reason:
"KvDelete deserialization error - insufficient data for old value flag"
.to_string(),
});
}
let old_value_flag = record_data[offset];
let old_value_bytes = if old_value_flag == 1 {
if record_data.len() < offset + 1 + 4 {
return Err(NativeBackendError::CorruptStringTable {
reason: "KvDelete deserialization error - insufficient data for old value length".to_string(),
});
}
let old_value_len = u32::from_le_bytes(
record_data[offset + 1..offset + 1 + 4].try_into().unwrap(),
) as usize;
if record_data.len() < offset + 1 + 4 + old_value_len {
return Err(NativeBackendError::CorruptStringTable {
reason:
"KvDelete deserialization error - insufficient data for old value"
.to_string(),
});
}
let value =
record_data[offset + 1 + 4..offset + 1 + 4 + old_value_len].to_vec();
Some(value)
} else {
None
};
let offset = offset + 1 + old_value_bytes.as_ref().map_or(0, |v| 4 + v.len());
if record_data.len() < offset + 8 {
return Err(NativeBackendError::CorruptStringTable {
reason:
"KvDelete deserialization error - insufficient data for old version"
.to_string(),
});
}
let old_version =
u64::from_le_bytes(record_data[offset..offset + 8].try_into().unwrap());
Ok(V2WALRecord::KvDelete {
key,
old_value_bytes,
old_value_type,
old_version,
})
}
_ => Err(NativeBackendError::CorruptStringTable {
reason: format!(
"WAL deserialization error - unsupported record type: {:?}",
record_type
),
}),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_record_type_properties() {
assert!(V2WALRecordType::NodeInsert.requires_checkpoint());
assert!(V2WALRecordType::TransactionBegin.is_transaction_control());
assert!(!V2WALRecordType::TransactionBegin.requires_checkpoint());
}
#[test]
fn test_v2_wal_record_cluster_key() {
let record = V2WALRecord::NodeInsert {
node_id: 42,
slot_offset: 1024,
node_data: vec![1, 2, 3],
};
assert_eq!(record.cluster_key(), Some(42));
let record = V2WALRecord::TransactionBegin {
tx_id: 100,
timestamp: 123456,
};
assert_eq!(record.cluster_key(), None);
}
#[test]
fn test_record_serialization_roundtrip() {
let original = V2WALRecord::NodeInsert {
node_id: 123,
slot_offset: 4096,
node_data: vec![4, 5, 6, 7, 8],
};
let serialized = V2WALSerializer::serialize(&original).unwrap();
let deserialized = V2WALSerializer::deserialize(&serialized).unwrap();
match (original, deserialized) {
(
V2WALRecord::NodeInsert {
node_id: id1,
slot_offset: off1,
node_data: data1,
},
V2WALRecord::NodeInsert {
node_id: id2,
slot_offset: off2,
node_data: data2,
},
) => {
assert_eq!(id1, id2);
assert_eq!(off1, off2);
assert_eq!(data1, data2);
}
_ => panic!("Record type mismatch after roundtrip"),
}
}
#[test]
fn test_serialized_size_estimation() {
let record = V2WALRecord::NodeInsert {
node_id: 42,
slot_offset: 1024,
node_data: vec![1, 2, 3, 4, 5],
};
let estimated = record.serialized_size();
let serialized = V2WALSerializer::serialize(&record).unwrap();
assert!(estimated >= serialized.len());
}
#[test]
fn test_kv_set_serialization_roundtrip() {
let original = V2WALRecord::KvSet {
key: b"test_key".to_vec(),
value_bytes: b"test_value".to_vec(),
value_type: 1,
ttl_seconds: Some(3600),
version: 12345,
};
let serialized = V2WALSerializer::serialize(&original).unwrap();
let deserialized = V2WALSerializer::deserialize(&serialized).unwrap();
match (original, deserialized) {
(
V2WALRecord::KvSet {
key: k1,
value_bytes: v1,
value_type: t1,
ttl_seconds: ttl1,
version: ver1,
},
V2WALRecord::KvSet {
key: k2,
value_bytes: v2,
value_type: t2,
ttl_seconds: ttl2,
version: ver2,
},
) => {
assert_eq!(k1, k2);
assert_eq!(v1, v2);
assert_eq!(t1, t2);
assert_eq!(ttl1, ttl2);
assert_eq!(ver1, ver2);
}
_ => panic!("Record type mismatch after KV set roundtrip"),
}
}
#[test]
fn test_kv_delete_serialization_roundtrip() {
let original = V2WALRecord::KvDelete {
key: b"test_key".to_vec(),
old_value_bytes: Some(b"old_value".to_vec()),
old_value_type: 1,
old_version: 12344,
};
let serialized = V2WALSerializer::serialize(&original).unwrap();
let deserialized = V2WALSerializer::deserialize(&serialized).unwrap();
match (original, deserialized) {
(
V2WALRecord::KvDelete {
key: k1,
old_value_bytes: v1,
old_value_type: t1,
old_version: ver1,
},
V2WALRecord::KvDelete {
key: k2,
old_value_bytes: v2,
old_value_type: t2,
old_version: ver2,
},
) => {
assert_eq!(k1, k2);
assert_eq!(v1, v2);
assert_eq!(t1, t2);
assert_eq!(ver1, ver2);
}
_ => panic!("Record type mismatch after KV delete roundtrip"),
}
}
#[test]
fn test_kv_set_without_ttl() {
let original = V2WALRecord::KvSet {
key: b"no_ttl_key".to_vec(),
value_bytes: b"value".to_vec(),
value_type: 0,
ttl_seconds: None,
version: 1,
};
let serialized = V2WALSerializer::serialize(&original).unwrap();
let deserialized = V2WALSerializer::deserialize(&serialized).unwrap();
match deserialized {
V2WALRecord::KvSet { ttl_seconds, .. } => {
assert_eq!(ttl_seconds, None);
}
_ => panic!("Wrong record type"),
}
}
#[test]
fn test_kv_delete_no_old_value() {
let original = V2WALRecord::KvDelete {
key: b"no_old_value".to_vec(),
old_value_bytes: None,
old_value_type: 0,
old_version: 0,
};
let serialized = V2WALSerializer::serialize(&original).unwrap();
let deserialized = V2WALSerializer::deserialize(&serialized).unwrap();
match deserialized {
V2WALRecord::KvDelete {
old_value_bytes, ..
} => {
assert_eq!(old_value_bytes, None);
}
_ => panic!("Wrong record type"),
}
}
}