#[cfg(test)]
use super::entry::WalEntry;
use super::entry::{LSN, WalOperation};
use crate::core::error::Result;
use crate::core::interning::InternedString;
use crate::core::temporal::Timestamp;
pub(crate) const OP_CREATE_NODE: u8 = 1;
pub(crate) const OP_CREATE_EDGE: u8 = 2;
pub(crate) const OP_UPDATE_NODE: u8 = 3;
pub(crate) const OP_UPDATE_EDGE: u8 = 4;
pub(crate) const OP_CHECKPOINT: u8 = 5;
pub(crate) const OP_DELETE_NODE: u8 = 6;
pub(crate) const OP_DELETE_EDGE: u8 = 7;
#[inline(always)]
fn serialize_interned_string(s: InternedString, buffer: &mut Vec<u8>) {
buffer.extend_from_slice(&s.as_u32().to_le_bytes());
}
pub(crate) fn estimate_entry_capacity(operation: &WalOperation) -> usize {
const FIXED_OVERHEAD: usize = 24;
const TIMESTAMP_SIZE: usize = 12;
let variable_size = match operation {
WalOperation::CreateNode { properties, .. } => {
let base = 1 + 8 + 4 + TIMESTAMP_SIZE;
base + properties.serialized_size()
}
WalOperation::CreateEdge { properties, .. } => {
let base = 1 + 8 + 8 + 8 + 4 + TIMESTAMP_SIZE;
base + properties.serialized_size()
}
WalOperation::UpdateNode { properties, .. } => {
let base = 1 + 8 + 8 + 4 + TIMESTAMP_SIZE;
base + properties.serialized_size()
}
WalOperation::UpdateEdge { properties, .. } => {
let base = 1 + 8 + 8 + 4 + TIMESTAMP_SIZE;
base + properties.serialized_size()
}
WalOperation::DeleteNode { .. } => {
1 + 8 + TIMESTAMP_SIZE
}
WalOperation::DeleteEdge { .. } => {
1 + 8 + TIMESTAMP_SIZE
}
WalOperation::Checkpoint { .. } => {
1 + 8 + 12
}
};
FIXED_OVERHEAD + variable_size
}
pub(crate) fn serialize_operation_into(
lsn: LSN,
timestamp: Timestamp,
operation: &WalOperation,
buffer: &mut Vec<u8>,
) -> Result<()> {
buffer.extend_from_slice(&lsn.0.to_le_bytes());
timestamp.serialize_into(buffer);
let checksum_offset = buffer.len();
buffer.extend_from_slice(&[0u8; 4]);
match operation {
WalOperation::CreateNode {
node_id,
label,
properties,
valid_from,
} => {
buffer.push(OP_CREATE_NODE);
buffer.extend_from_slice(&node_id.as_u64().to_le_bytes());
serialize_interned_string(*label, buffer);
properties.serialize_into(buffer)?;
valid_from.serialize_into(buffer);
}
WalOperation::CreateEdge {
edge_id,
source,
target,
label,
properties,
valid_from,
} => {
buffer.push(OP_CREATE_EDGE);
buffer.extend_from_slice(&edge_id.as_u64().to_le_bytes());
buffer.extend_from_slice(&source.as_u64().to_le_bytes());
buffer.extend_from_slice(&target.as_u64().to_le_bytes());
serialize_interned_string(*label, buffer);
properties.serialize_into(buffer)?;
valid_from.serialize_into(buffer);
}
WalOperation::UpdateNode {
node_id,
version_id,
label,
properties,
valid_from,
} => {
buffer.push(OP_UPDATE_NODE);
buffer.extend_from_slice(&node_id.as_u64().to_le_bytes());
buffer.extend_from_slice(&version_id.as_u64().to_le_bytes());
serialize_interned_string(*label, buffer);
properties.serialize_into(buffer)?;
valid_from.serialize_into(buffer);
}
WalOperation::UpdateEdge {
edge_id,
version_id,
label,
properties,
valid_from,
} => {
buffer.push(OP_UPDATE_EDGE);
buffer.extend_from_slice(&edge_id.as_u64().to_le_bytes());
buffer.extend_from_slice(&version_id.as_u64().to_le_bytes());
serialize_interned_string(*label, buffer);
properties.serialize_into(buffer)?;
valid_from.serialize_into(buffer);
}
WalOperation::DeleteNode {
node_id,
valid_from,
} => {
buffer.push(OP_DELETE_NODE);
buffer.extend_from_slice(&node_id.as_u64().to_le_bytes());
valid_from.serialize_into(buffer);
}
WalOperation::DeleteEdge {
edge_id,
valid_from,
} => {
buffer.push(OP_DELETE_EDGE);
buffer.extend_from_slice(&edge_id.as_u64().to_le_bytes());
valid_from.serialize_into(buffer);
}
WalOperation::Checkpoint { lsn, timestamp } => {
buffer.push(OP_CHECKPOINT);
buffer.extend_from_slice(&lsn.0.to_le_bytes());
timestamp.serialize_into(buffer);
}
}
let mut hasher = crc32fast::Hasher::new();
hasher.update(&buffer[0..checksum_offset]); hasher.update(&buffer[checksum_offset + 4..]); let checksum = hasher.finalize();
buffer[checksum_offset..checksum_offset + 4].copy_from_slice(&checksum.to_le_bytes());
Ok(())
}
#[cfg(test)]
pub(crate) fn serialize_entry_into(entry: &WalEntry, buffer: &mut Vec<u8>) -> Result<()> {
serialize_operation_into(entry.lsn, entry.timestamp, &entry.operation, buffer)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::EdgeId;
use crate::core::NodeId;
use crate::core::interning::GLOBAL_INTERNER;
use crate::core::property::PropertyMapBuilder;
use crate::core::temporal::Timestamp;
use crate::storage::wal::entry::LSN;
fn test_timestamp() -> Timestamp {
use crate::core::hlc::HybridTimestamp;
HybridTimestamp::new_unchecked(1000000, 0)
}
#[test]
fn test_estimate_capacity_checkpoint() {
let op = WalOperation::Checkpoint {
lsn: LSN(1),
timestamp: test_timestamp(),
};
let estimated = estimate_entry_capacity(&op);
assert_eq!(estimated, 45, "Checkpoint should be exactly 45 bytes");
let entry = WalEntry::new(LSN(1), op);
let mut buffer = Vec::new();
serialize_entry_into(&entry, &mut buffer).unwrap();
assert!(
buffer.len() <= estimated,
"Actual size {} should not exceed estimate {}",
buffer.len(),
estimated
);
}
#[test]
fn test_estimate_capacity_delete_node() {
let op = WalOperation::DeleteNode {
node_id: NodeId::new(1).unwrap(),
valid_from: test_timestamp(),
};
let estimated = estimate_entry_capacity(&op);
assert_eq!(estimated, 45, "DeleteNode should be exactly 45 bytes");
let entry = WalEntry::new(LSN(1), op);
let mut buffer = Vec::new();
serialize_entry_into(&entry, &mut buffer).unwrap();
assert!(
buffer.len() <= estimated,
"Actual size {} should not exceed estimate {}",
buffer.len(),
estimated
);
}
#[test]
fn test_estimate_capacity_delete_edge() {
let op = WalOperation::DeleteEdge {
edge_id: EdgeId::new(1).unwrap(),
valid_from: test_timestamp(),
};
let estimated = estimate_entry_capacity(&op);
assert_eq!(estimated, 45, "DeleteEdge should be exactly 45 bytes");
let entry = WalEntry::new(LSN(1), op);
let mut buffer = Vec::new();
serialize_entry_into(&entry, &mut buffer).unwrap();
assert!(
buffer.len() <= estimated,
"Actual size {} should not exceed estimate {}",
buffer.len(),
estimated
);
}
#[test]
fn test_estimate_capacity_create_node_empty_properties() {
let op = WalOperation::CreateNode {
node_id: NodeId::new(1).unwrap(),
label: GLOBAL_INTERNER.intern("test").unwrap(),
properties: PropertyMapBuilder::new().build(),
valid_from: test_timestamp(),
};
let estimated = estimate_entry_capacity(&op);
assert_eq!(
estimated, 53,
"CreateNode with empty properties should be 53 bytes"
);
let entry = WalEntry::new(LSN(1), op);
let mut buffer = Vec::new();
serialize_entry_into(&entry, &mut buffer).unwrap();
assert!(
buffer.len() <= estimated,
"Actual size {} should not exceed estimate {}",
buffer.len(),
estimated
);
}
#[test]
fn test_estimate_capacity_create_node_with_properties() {
let properties = PropertyMapBuilder::new()
.insert("name", "Alice")
.insert("age", 30)
.insert("score", 95.5)
.build();
let op = WalOperation::CreateNode {
node_id: NodeId::new(1).unwrap(),
label: GLOBAL_INTERNER.intern("Person").unwrap(),
properties,
valid_from: test_timestamp(),
};
let estimated = estimate_entry_capacity(&op);
let entry = WalEntry::new(LSN(1), op);
let mut buffer = Vec::new();
serialize_entry_into(&entry, &mut buffer).unwrap();
assert!(
buffer.len() <= estimated,
"Actual size {} should not exceed estimate {}",
buffer.len(),
estimated
);
let overhead_ratio = estimated as f64 / buffer.len() as f64;
assert!(
overhead_ratio <= 1.5,
"Estimate {} should not be more than 50% over actual size {}",
estimated,
buffer.len()
);
}
#[test]
fn test_estimate_capacity_create_edge() {
let properties = PropertyMapBuilder::new()
.insert("weight", 1.5)
.insert("type", "FRIEND")
.build();
let op = WalOperation::CreateEdge {
edge_id: EdgeId::new(1).unwrap(),
source: NodeId::new(1).unwrap(),
target: NodeId::new(2).unwrap(),
label: GLOBAL_INTERNER.intern("KNOWS").unwrap(),
properties,
valid_from: test_timestamp(),
};
let estimated = estimate_entry_capacity(&op);
let entry = WalEntry::new(LSN(1), op);
let mut buffer = Vec::new();
serialize_entry_into(&entry, &mut buffer).unwrap();
assert!(
buffer.len() <= estimated,
"Actual size {} should not exceed estimate {}",
buffer.len(),
estimated
);
let overhead_ratio = estimated as f64 / buffer.len() as f64;
assert!(
overhead_ratio <= 1.5,
"Estimate {} should not be more than 50% over actual size {}",
estimated,
buffer.len()
);
}
#[test]
fn test_estimate_capacity_with_vector_property() {
let embedding = vec![0.1, 0.2, 0.3, 0.4];
let properties = PropertyMapBuilder::new()
.insert_vector("embedding", &embedding)
.build();
let op = WalOperation::CreateNode {
node_id: NodeId::new(1).unwrap(),
label: GLOBAL_INTERNER.intern("Document").unwrap(),
properties,
valid_from: test_timestamp(),
};
let estimated = estimate_entry_capacity(&op);
let entry = WalEntry::new(LSN(1), op);
let mut buffer = Vec::new();
serialize_entry_into(&entry, &mut buffer).unwrap();
assert!(
buffer.len() <= estimated,
"Actual size {} should not exceed estimate {}",
buffer.len(),
estimated
);
let overhead_ratio = estimated as f64 / buffer.len() as f64;
assert!(
overhead_ratio <= 1.5,
"Estimate {} should not be more than 50% over actual size {}",
estimated,
buffer.len()
);
}
#[test]
fn test_estimate_capacity_large_properties() {
let mut builder = PropertyMapBuilder::new();
for i in 0..50 {
builder = builder.insert(&format!("key_{}", i), i);
}
let properties = builder.build();
let op = WalOperation::UpdateNode {
node_id: NodeId::new(1).unwrap(),
version_id: crate::core::VersionId::new(1).unwrap(),
label: GLOBAL_INTERNER.intern("LargeNode").unwrap(),
properties,
valid_from: test_timestamp(),
};
let estimated = estimate_entry_capacity(&op);
let entry = WalEntry::new(LSN(1), op);
let mut buffer = Vec::new();
serialize_entry_into(&entry, &mut buffer).unwrap();
assert!(
buffer.len() <= estimated,
"Actual size {} should not exceed estimate {}",
buffer.len(),
estimated
);
let overhead_ratio = estimated as f64 / buffer.len() as f64;
assert!(
overhead_ratio <= 1.5,
"Estimate {} should not be more than 50% over actual size {}",
estimated,
buffer.len()
);
}
}
#[cfg(test)]
mod prop_tests {
use super::*;
use crate::core::hlc::HybridTimestamp;
use crate::core::id::NodeId;
use crate::core::interning::GLOBAL_INTERNER;
use crate::core::property::{PropertyMap, PropertyMapBuilder, PropertyValue};
use proptest::prelude::*;
fn arb_interned_string() -> impl Strategy<Value = InternedString> {
"[a-zA-Z0-9_]{1,10}".prop_map(|s| GLOBAL_INTERNER.intern(&s).unwrap())
}
fn arb_property_value() -> impl Strategy<Value = PropertyValue> {
prop_oneof![
Just(PropertyValue::Null),
any::<bool>().prop_map(PropertyValue::Bool),
any::<i64>().prop_map(PropertyValue::Int),
any::<f64>().prop_map(PropertyValue::Float),
"[a-zA-Z0-9]{0,20}".prop_map(|s| PropertyValue::string(&s)),
]
}
fn arb_property_map() -> impl Strategy<Value = PropertyMap> {
prop::collection::vec(
(
"[a-z]{1,10}", arb_property_value(), ),
0..10, )
.prop_map(|entries| {
let mut builder = PropertyMapBuilder::new();
for (k, v) in entries {
builder = builder.insert(&k, v);
}
builder.build()
})
}
fn arb_timestamp() -> impl Strategy<Value = Timestamp> {
any::<i64>().prop_map(|t| HybridTimestamp::new_unchecked(t, 0))
}
fn arb_wal_operation() -> impl Strategy<Value = WalOperation> {
prop_oneof![
(
(1u64..u64::MAX).prop_map(|id| NodeId::new(id).unwrap()),
arb_interned_string(),
arb_property_map(),
arb_timestamp()
)
.prop_map(|(node_id, label, properties, valid_from)| {
WalOperation::CreateNode {
node_id,
label,
properties,
valid_from,
}
}),
(
(1u64..u64::MAX).prop_map(|id| NodeId::new(id).unwrap()),
arb_timestamp()
)
.prop_map(|(node_id, valid_from)| {
WalOperation::DeleteNode {
node_id,
valid_from,
}
}),
(any::<u64>().prop_map(LSN), arb_timestamp())
.prop_map(|(lsn, timestamp)| { WalOperation::Checkpoint { lsn, timestamp } })
]
}
proptest! {
#[test]
fn test_estimate_capacity_is_upper_bound(
op in arb_wal_operation(),
lsn_val in any::<u64>()
) {
let lsn = LSN(lsn_val);
let timestamp = HybridTimestamp::new_unchecked(1000, 0);
let estimated = estimate_entry_capacity(&op);
let mut buffer = Vec::new();
serialize_operation_into(lsn, timestamp, &op, &mut buffer).unwrap();
let actual = buffer.len();
prop_assert!(estimated >= actual, "Estimate {} < Actual {}", estimated, actual);
if actual > 100 {
prop_assert!(estimated <= actual * 2, "Estimate {} > 2x Actual {}", estimated, actual);
}
}
}
}