use crate::error::EngineError;
use crate::types::*;
use std::collections::BTreeMap;
use std::io::{Cursor, Read};
fn write_u8(buf: &mut Vec<u8>, v: u8) {
buf.push(v);
}
fn write_u16(buf: &mut Vec<u8>, v: u16) {
buf.extend_from_slice(&v.to_le_bytes());
}
fn write_u32(buf: &mut Vec<u8>, v: u32) {
buf.extend_from_slice(&v.to_le_bytes());
}
fn write_u64(buf: &mut Vec<u8>, v: u64) {
buf.extend_from_slice(&v.to_le_bytes());
}
fn write_i64(buf: &mut Vec<u8>, v: i64) {
buf.extend_from_slice(&v.to_le_bytes());
}
fn write_f32(buf: &mut Vec<u8>, v: f32) {
buf.extend_from_slice(&v.to_le_bytes());
}
fn write_sparse_vector_entry(buf: &mut Vec<u8>, dimension_id: u32, weight: f32) {
write_u32(buf, dimension_id);
write_f32(buf, weight);
}
const MAX_BYTES_LEN: usize = 64 * 1024 * 1024;
fn write_bytes(buf: &mut Vec<u8>, data: &[u8]) -> Result<(), EngineError> {
if data.len() > MAX_BYTES_LEN {
return Err(EngineError::SerializationError(format!(
"bytes payload too large: {} bytes (max {})",
data.len(),
MAX_BYTES_LEN
)));
}
write_u32(buf, data.len() as u32);
buf.extend_from_slice(data);
Ok(())
}
fn write_str(buf: &mut Vec<u8>, s: &str) -> Result<(), EngineError> {
let bytes = s.as_bytes();
if bytes.len() > u16::MAX as usize {
return Err(EngineError::SerializationError(format!(
"string too long: {} bytes (max {})",
bytes.len(),
u16::MAX
)));
}
write_u16(buf, bytes.len() as u16);
buf.extend_from_slice(bytes);
Ok(())
}
fn read_u8(cursor: &mut Cursor<&[u8]>) -> Result<u8, EngineError> {
let mut buf = [0u8; 1];
cursor
.read_exact(&mut buf)
.map_err(|_| EngineError::CorruptRecord("unexpected EOF reading u8".into()))?;
Ok(buf[0])
}
fn read_u16(cursor: &mut Cursor<&[u8]>) -> Result<u16, EngineError> {
let mut buf = [0u8; 2];
cursor
.read_exact(&mut buf)
.map_err(|_| EngineError::CorruptRecord("unexpected EOF reading u16".into()))?;
Ok(u16::from_le_bytes(buf))
}
fn read_u32(cursor: &mut Cursor<&[u8]>) -> Result<u32, EngineError> {
let mut buf = [0u8; 4];
cursor
.read_exact(&mut buf)
.map_err(|_| EngineError::CorruptRecord("unexpected EOF reading u32".into()))?;
Ok(u32::from_le_bytes(buf))
}
fn read_u64(cursor: &mut Cursor<&[u8]>) -> Result<u64, EngineError> {
let mut buf = [0u8; 8];
cursor
.read_exact(&mut buf)
.map_err(|_| EngineError::CorruptRecord("unexpected EOF reading u64".into()))?;
Ok(u64::from_le_bytes(buf))
}
fn read_i64(cursor: &mut Cursor<&[u8]>) -> Result<i64, EngineError> {
let mut buf = [0u8; 8];
cursor
.read_exact(&mut buf)
.map_err(|_| EngineError::CorruptRecord("unexpected EOF reading i64".into()))?;
Ok(i64::from_le_bytes(buf))
}
fn read_f32(cursor: &mut Cursor<&[u8]>) -> Result<f32, EngineError> {
let mut buf = [0u8; 4];
cursor
.read_exact(&mut buf)
.map_err(|_| EngineError::CorruptRecord("unexpected EOF reading f32".into()))?;
Ok(f32::from_le_bytes(buf))
}
fn remaining_bytes(cursor: &Cursor<&[u8]>) -> usize {
cursor
.get_ref()
.len()
.saturating_sub(cursor.position() as usize)
}
fn read_bytes(cursor: &mut Cursor<&[u8]>) -> Result<Vec<u8>, EngineError> {
let len = read_u32(cursor)? as usize;
if len > MAX_BYTES_LEN {
return Err(EngineError::CorruptRecord(format!(
"bytes payload too large: {} (max {})",
len, MAX_BYTES_LEN
)));
}
let mut buf = vec![0u8; len];
cursor
.read_exact(&mut buf)
.map_err(|_| EngineError::CorruptRecord("unexpected EOF reading bytes".into()))?;
Ok(buf)
}
fn read_str(cursor: &mut Cursor<&[u8]>) -> Result<String, EngineError> {
let len = read_u16(cursor)? as usize;
let mut buf = vec![0u8; len];
cursor
.read_exact(&mut buf)
.map_err(|_| EngineError::CorruptRecord("unexpected EOF reading string".into()))?;
String::from_utf8(buf).map_err(|_| EngineError::CorruptRecord("invalid UTF-8 in string".into()))
}
fn read_node_label_set(cursor: &mut Cursor<&[u8]>) -> Result<NodeLabelSet, EngineError> {
let count = read_u8(cursor)? as usize;
if count == 0 {
return Err(EngineError::CorruptRecord(
"node WAL label_count must be at least 1".into(),
));
}
if count > MAX_NODE_LABELS_PER_NODE {
return Err(EngineError::CorruptRecord(format!(
"node WAL label_count {} exceeds maximum {}",
count, MAX_NODE_LABELS_PER_NODE
)));
}
let mut ids = [0u32; MAX_NODE_LABELS_PER_NODE];
for index in 0..count {
ids[index] = read_u32(cursor)?;
if index > 0 && ids[index - 1] >= ids[index] {
return Err(EngineError::CorruptRecord(
"node WAL label IDs must be sorted ascending and unique".into(),
));
}
}
NodeLabelSet::from_canonical_ids(&ids[..count])
.map_err(|err| EngineError::CorruptRecord(format!("invalid node WAL label set: {err}")))
}
pub(crate) fn encode_wal_op_into(op: &WalOp, buf: &mut Vec<u8>) -> Result<(), EngineError> {
buf.clear();
match op {
WalOp::UpsertNode(node) => {
write_u8(buf, OpTag::UpsertNode as u8);
write_u64(buf, node.id);
write_u8(buf, node.label_ids.len() as u8);
for &label_id in node.label_ids.as_slice() {
write_u32(buf, label_id);
}
write_str(buf, &node.key)?;
write_i64(buf, node.created_at);
write_i64(buf, node.updated_at);
write_f32(buf, node.weight);
let props_bytes = rmp_serde::to_vec(&node.props)
.map_err(|e| EngineError::SerializationError(e.to_string()))?;
write_bytes(buf, &props_bytes)?;
let mut flags = 0u8;
if node.dense_vector.is_some() {
flags |= 0b0000_0001;
}
if node.sparse_vector.is_some() {
flags |= 0b0000_0010;
}
write_u8(buf, flags);
if let Some(dense) = &node.dense_vector {
write_u32(buf, dense.len() as u32);
for &value in dense {
write_f32(buf, value);
}
}
if let Some(sparse) = &node.sparse_vector {
write_u32(buf, sparse.len() as u32);
for &(dimension_id, weight) in sparse {
write_sparse_vector_entry(buf, dimension_id, weight);
}
}
}
WalOp::UpsertEdge(edge) => {
write_u8(buf, OpTag::UpsertEdge as u8);
write_u64(buf, edge.id);
write_u64(buf, edge.from);
write_u64(buf, edge.to);
write_u32(buf, edge.label_id);
write_i64(buf, edge.created_at);
write_i64(buf, edge.updated_at);
write_f32(buf, edge.weight);
let props_bytes = rmp_serde::to_vec(&edge.props)
.map_err(|e| EngineError::SerializationError(e.to_string()))?;
write_bytes(buf, &props_bytes)?;
write_i64(buf, edge.valid_from);
write_i64(buf, edge.valid_to);
}
WalOp::DeleteNode { id, deleted_at } => {
write_u8(buf, OpTag::DeleteNode as u8);
write_u64(buf, *id);
write_i64(buf, *deleted_at);
}
WalOp::DeleteEdge { id, deleted_at } => {
write_u8(buf, OpTag::DeleteEdge as u8);
write_u64(buf, *id);
write_i64(buf, *deleted_at);
}
WalOp::EnsureNodeLabel { label, label_id } => {
write_u8(buf, OpTag::EnsureNodeLabel as u8);
write_str(buf, label)?;
write_u32(buf, *label_id);
}
WalOp::EnsureEdgeLabel { label, label_id } => {
write_u8(buf, OpTag::EnsureEdgeLabel as u8);
write_str(buf, label)?;
write_u32(buf, *label_id);
}
WalOp::BeginAtomicBatch {
first_seq,
op_count,
} => {
write_u8(buf, OpTag::BeginAtomicBatch as u8);
write_u64(buf, *first_seq);
write_u32(buf, *op_count);
}
WalOp::CommitAtomicBatch {
first_seq,
op_count,
} => {
write_u8(buf, OpTag::CommitAtomicBatch as u8);
write_u64(buf, *first_seq);
write_u32(buf, *op_count);
}
}
Ok(())
}
#[cfg(test)]
pub(crate) fn encode_wal_op(op: &WalOp) -> Result<Vec<u8>, EngineError> {
let mut buf = Vec::new();
encode_wal_op_into(op, &mut buf)?;
Ok(buf)
}
fn reject_trailing_bytes(cursor: &Cursor<&[u8]>, context: &str) -> Result<(), EngineError> {
if remaining_bytes(cursor) != 0 {
return Err(EngineError::CorruptRecord(format!(
"unexpected trailing bytes in {context}"
)));
}
Ok(())
}
pub(crate) fn decode_wal_op(data: &[u8]) -> Result<WalOp, EngineError> {
let mut cursor = Cursor::new(data);
let op_tag = read_u8(&mut cursor)?;
match OpTag::from_u8(op_tag) {
Some(OpTag::UpsertNode) => {
let id = read_u64(&mut cursor)?;
let label_ids = read_node_label_set(&mut cursor)?;
let key = read_str(&mut cursor)?;
let created_at = read_i64(&mut cursor)?;
let updated_at = read_i64(&mut cursor)?;
let weight = read_f32(&mut cursor)?;
let props_bytes = read_bytes(&mut cursor)?;
let props: BTreeMap<String, PropValue> = rmp_serde::from_slice(&props_bytes)
.map_err(|e| EngineError::SerializationError(e.to_string()))?;
let (dense_vector, sparse_vector) = if remaining_bytes(&cursor) == 0 {
(None, None)
} else {
let flags = read_u8(&mut cursor)?;
if flags & !0b0000_0011 != 0 {
return Err(EngineError::CorruptRecord(format!(
"invalid vector flags on node WAL op: {:#010b}",
flags
)));
}
let dense_vector = if flags & 0b0000_0001 != 0 {
let len = read_u32(&mut cursor)? as usize;
let mut values = Vec::with_capacity(len);
for _ in 0..len {
values.push(read_f32(&mut cursor)?);
}
Some(values)
} else {
None
};
let sparse_vector = if flags & 0b0000_0010 != 0 {
let len = read_u32(&mut cursor)? as usize;
let mut values = Vec::with_capacity(len);
for _ in 0..len {
let dimension_id = read_u32(&mut cursor)?;
let weight = read_f32(&mut cursor)?;
values.push((dimension_id, weight));
}
Some(values)
} else {
None
};
reject_trailing_bytes(&cursor, "node WAL op")?;
(dense_vector, sparse_vector)
};
Ok(WalOp::UpsertNode(NodeRecord {
id,
label_ids,
key,
props,
created_at,
updated_at,
weight,
dense_vector,
sparse_vector,
last_write_seq: 0,
}))
}
Some(OpTag::UpsertEdge) => {
let id = read_u64(&mut cursor)?;
let from = read_u64(&mut cursor)?;
let to = read_u64(&mut cursor)?;
let label_id = read_u32(&mut cursor)?;
let created_at = read_i64(&mut cursor)?;
let updated_at = read_i64(&mut cursor)?;
let weight = read_f32(&mut cursor)?;
let props_bytes = read_bytes(&mut cursor)?;
let props: BTreeMap<String, PropValue> = rmp_serde::from_slice(&props_bytes)
.map_err(|e| EngineError::SerializationError(e.to_string()))?;
let valid_from = read_i64(&mut cursor)?;
let valid_to = read_i64(&mut cursor)?;
reject_trailing_bytes(&cursor, "edge WAL op")?;
Ok(WalOp::UpsertEdge(EdgeRecord {
id,
from,
to,
label_id,
props,
created_at,
updated_at,
weight,
valid_from,
valid_to,
last_write_seq: 0,
}))
}
Some(OpTag::DeleteNode) => {
let id = read_u64(&mut cursor)?;
let deleted_at = read_i64(&mut cursor)?;
reject_trailing_bytes(&cursor, "delete-node WAL op")?;
Ok(WalOp::DeleteNode { id, deleted_at })
}
Some(OpTag::DeleteEdge) => {
let id = read_u64(&mut cursor)?;
let deleted_at = read_i64(&mut cursor)?;
reject_trailing_bytes(&cursor, "delete-edge WAL op")?;
Ok(WalOp::DeleteEdge { id, deleted_at })
}
Some(OpTag::EnsureNodeLabel) => {
let label = read_str(&mut cursor)?;
let label_id = read_u32(&mut cursor)?;
reject_trailing_bytes(&cursor, "ensure-node-label WAL op")?;
Ok(WalOp::EnsureNodeLabel { label, label_id })
}
Some(OpTag::EnsureEdgeLabel) => {
let label = read_str(&mut cursor)?;
let label_id = read_u32(&mut cursor)?;
reject_trailing_bytes(&cursor, "ensure-edge-label WAL op")?;
Ok(WalOp::EnsureEdgeLabel { label, label_id })
}
Some(OpTag::BeginAtomicBatch) => {
let first_seq = read_u64(&mut cursor)?;
let op_count = read_u32(&mut cursor)?;
reject_trailing_bytes(&cursor, "begin-atomic-batch WAL op")?;
Ok(WalOp::BeginAtomicBatch {
first_seq,
op_count,
})
}
Some(OpTag::CommitAtomicBatch) => {
let first_seq = read_u64(&mut cursor)?;
let op_count = read_u32(&mut cursor)?;
reject_trailing_bytes(&cursor, "commit-atomic-batch WAL op")?;
Ok(WalOp::CommitAtomicBatch {
first_seq,
op_count,
})
}
None => Err(EngineError::CorruptRecord(format!(
"unknown op tag: {}",
op_tag
))),
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::BTreeMap;
#[test]
fn test_roundtrip_upsert_node() {
let mut props = BTreeMap::new();
props.insert("name".to_string(), PropValue::String("Alice".to_string()));
props.insert("age".to_string(), PropValue::Int(30));
let op = WalOp::UpsertNode(NodeRecord {
id: 42,
label_ids: NodeLabelSet::single(1).unwrap(),
key: "user:alice".to_string(),
props,
created_at: 1000000,
updated_at: 1000001,
weight: 0.95,
dense_vector: None,
sparse_vector: None,
last_write_seq: 0,
});
let encoded = encode_wal_op(&op).unwrap();
assert_eq!(encoded[9], 1);
assert_eq!(u32::from_le_bytes(encoded[10..14].try_into().unwrap()), 1);
let decoded = decode_wal_op(&encoded).unwrap();
match decoded {
WalOp::UpsertNode(node) => {
assert_eq!(node.id, 42);
assert_eq!(node.label_ids.as_slice(), &[1]);
assert_eq!(node.key, "user:alice");
assert_eq!(node.created_at, 1000000);
assert_eq!(node.updated_at, 1000001);
assert!((node.weight - 0.95).abs() < f32::EPSILON);
assert_eq!(
node.props.get("name"),
Some(&PropValue::String("Alice".to_string()))
);
assert_eq!(node.props.get("age"), Some(&PropValue::Int(30)));
}
_ => panic!("expected UpsertNode"),
}
}
#[test]
fn test_roundtrip_upsert_node_with_vectors() {
let op = WalOp::UpsertNode(NodeRecord {
id: 7,
label_ids: NodeLabelSet::single(2).unwrap(),
key: "vector-node".to_string(),
props: BTreeMap::new(),
created_at: 10,
updated_at: 11,
weight: 0.5,
dense_vector: Some(vec![0.1, 0.2, 0.3]),
sparse_vector: Some(vec![(4, 1.5), (9, 2.5)]),
last_write_seq: 0,
});
let encoded = encode_wal_op(&op).unwrap();
let decoded = decode_wal_op(&encoded).unwrap();
match decoded {
WalOp::UpsertNode(node) => {
assert_eq!(node.dense_vector, Some(vec![0.1, 0.2, 0.3]));
assert_eq!(node.sparse_vector, Some(vec![(4, 1.5), (9, 2.5)]));
}
_ => panic!("expected UpsertNode"),
}
}
#[test]
fn test_roundtrip_upsert_node_with_multi_label_sets() {
let cases: &[&[u32]] = &[&[2, 5], &[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]];
for &label_ids in cases {
let op = WalOp::UpsertNode(NodeRecord {
id: 100 + label_ids.len() as u64,
label_ids: NodeLabelSet::from_canonical_ids(label_ids).unwrap(),
key: format!("multi-label-{}", label_ids.len()),
props: BTreeMap::new(),
created_at: 10,
updated_at: 11,
weight: 0.5,
dense_vector: None,
sparse_vector: None,
last_write_seq: 0,
});
let encoded = encode_wal_op(&op).unwrap();
assert_eq!(encoded[9], label_ids.len() as u8);
let decoded = decode_wal_op(&encoded).unwrap();
match decoded {
WalOp::UpsertNode(node) => {
assert_eq!(node.label_ids.as_slice(), label_ids);
assert_eq!(node.key, format!("multi-label-{}", label_ids.len()));
}
_ => panic!("expected UpsertNode"),
}
}
}
#[test]
fn test_decode_legacy_upsert_node_without_vector_payload_is_rejected() {
let mut props = BTreeMap::new();
props.insert("name".to_string(), PropValue::String("legacy".to_string()));
let mut encoded = Vec::new();
write_u8(&mut encoded, OpTag::UpsertNode as u8);
write_u64(&mut encoded, 42);
write_u32(&mut encoded, 1);
write_str(&mut encoded, "legacy").unwrap();
write_i64(&mut encoded, 100);
write_i64(&mut encoded, 101);
write_f32(&mut encoded, 0.75);
let props_bytes = rmp_serde::to_vec(&props).unwrap();
write_bytes(&mut encoded, &props_bytes).unwrap();
let err = decode_wal_op(&encoded).unwrap_err();
assert!(
matches!(
err,
EngineError::CorruptRecord(_) | EngineError::SerializationError(_)
),
"unexpected legacy WAL error: {err}"
);
}
#[test]
fn test_decode_upsert_node_rejects_invalid_label_sets() {
fn encode_with_labels(labels: &[u32]) -> Vec<u8> {
let mut encoded = Vec::new();
write_u8(&mut encoded, OpTag::UpsertNode as u8);
write_u64(&mut encoded, 42);
write_u8(&mut encoded, labels.len() as u8);
for &label_id in labels {
write_u32(&mut encoded, label_id);
}
write_str(&mut encoded, "bad").unwrap();
write_i64(&mut encoded, 1);
write_i64(&mut encoded, 2);
write_f32(&mut encoded, 1.0);
let props_bytes = rmp_serde::to_vec(&BTreeMap::<String, PropValue>::new()).unwrap();
write_bytes(&mut encoded, &props_bytes).unwrap();
write_u8(&mut encoded, 0);
encoded
}
for labels in [&[][..], &[2, 1][..], &[1, 1][..], &[0][..]] {
let err = decode_wal_op(&encode_with_labels(labels)).unwrap_err();
assert!(
err.to_string().contains("node WAL"),
"unexpected error for {labels:?}: {err}"
);
}
}
#[test]
fn test_roundtrip_upsert_edge() {
let mut props = BTreeMap::new();
props.insert("role".to_string(), PropValue::String("owner".to_string()));
let op = WalOp::UpsertEdge(EdgeRecord {
id: 100,
from: 1,
to: 2,
label_id: 10,
props,
created_at: 2000000,
updated_at: 2000001,
weight: 1.0,
valid_from: 1500000,
valid_to: 3000000,
last_write_seq: 0,
});
let encoded = encode_wal_op(&op).unwrap();
let decoded = decode_wal_op(&encoded).unwrap();
match decoded {
WalOp::UpsertEdge(edge) => {
assert_eq!(edge.id, 100);
assert_eq!(edge.from, 1);
assert_eq!(edge.to, 2);
assert_eq!(edge.label_id, 10);
assert_eq!(edge.created_at, 2000000);
assert_eq!(edge.updated_at, 2000001);
assert!((edge.weight - 1.0).abs() < f32::EPSILON);
assert_eq!(edge.valid_from, 1500000);
assert_eq!(edge.valid_to, 3000000);
assert_eq!(
edge.props.get("role"),
Some(&PropValue::String("owner".to_string()))
);
}
_ => panic!("expected UpsertEdge"),
}
}
#[test]
fn test_roundtrip_delete_node() {
let op = WalOp::DeleteNode {
id: 42,
deleted_at: 3000000,
};
let encoded = encode_wal_op(&op).unwrap();
let decoded = decode_wal_op(&encoded).unwrap();
match decoded {
WalOp::DeleteNode { id, deleted_at } => {
assert_eq!(id, 42);
assert_eq!(deleted_at, 3000000);
}
_ => panic!("expected DeleteNode"),
}
}
#[test]
fn test_roundtrip_delete_edge() {
let op = WalOp::DeleteEdge {
id: 99,
deleted_at: 4000000,
};
let encoded = encode_wal_op(&op).unwrap();
let decoded = decode_wal_op(&encoded).unwrap();
match decoded {
WalOp::DeleteEdge { id, deleted_at } => {
assert_eq!(id, 99);
assert_eq!(deleted_at, 4000000);
}
_ => panic!("expected DeleteEdge"),
}
}
#[test]
fn test_roundtrip_ensure_node_label() {
let op = WalOp::EnsureNodeLabel {
label: "Person".to_string(),
label_id: 7,
};
let encoded = encode_wal_op(&op).unwrap();
let decoded = decode_wal_op(&encoded).unwrap();
match decoded {
WalOp::EnsureNodeLabel { label, label_id } => {
assert_eq!(label, "Person");
assert_eq!(label_id, 7);
}
_ => panic!("expected EnsureNodeLabel"),
}
}
#[test]
fn test_roundtrip_ensure_edge_label() {
let op = WalOp::EnsureEdgeLabel {
label: "KNOWS".to_string(),
label_id: 11,
};
let encoded = encode_wal_op(&op).unwrap();
let decoded = decode_wal_op(&encoded).unwrap();
match decoded {
WalOp::EnsureEdgeLabel { label, label_id } => {
assert_eq!(label, "KNOWS");
assert_eq!(label_id, 11);
}
_ => panic!("expected EnsureEdgeLabel"),
}
}
#[test]
fn test_roundtrip_atomic_batch_markers() {
let begin = WalOp::BeginAtomicBatch {
first_seq: 42,
op_count: 3,
};
let encoded = encode_wal_op(&begin).unwrap();
let decoded = decode_wal_op(&encoded).unwrap();
match decoded {
WalOp::BeginAtomicBatch {
first_seq,
op_count,
} => {
assert_eq!(first_seq, 42);
assert_eq!(op_count, 3);
}
_ => panic!("expected BeginAtomicBatch"),
}
let commit = WalOp::CommitAtomicBatch {
first_seq: 42,
op_count: 3,
};
let encoded = encode_wal_op(&commit).unwrap();
let decoded = decode_wal_op(&encoded).unwrap();
match decoded {
WalOp::CommitAtomicBatch {
first_seq,
op_count,
} => {
assert_eq!(first_seq, 42);
assert_eq!(op_count, 3);
}
_ => panic!("expected CommitAtomicBatch"),
}
}
#[test]
fn test_roundtrip_empty_props() {
let op = WalOp::UpsertNode(NodeRecord {
id: 1,
label_ids: NodeLabelSet::single(1).unwrap(),
key: "test".to_string(),
props: BTreeMap::new(),
created_at: 0,
updated_at: 0,
weight: 0.0,
dense_vector: None,
sparse_vector: None,
last_write_seq: 0,
});
let encoded = encode_wal_op(&op).unwrap();
let decoded = decode_wal_op(&encoded).unwrap();
match decoded {
WalOp::UpsertNode(node) => {
assert!(node.props.is_empty());
}
_ => panic!("expected UpsertNode"),
}
}
#[test]
#[allow(clippy::approx_constant)]
fn test_roundtrip_all_prop_types() {
let mut props = BTreeMap::new();
props.insert("null_val".to_string(), PropValue::Null);
props.insert("bool_val".to_string(), PropValue::Bool(true));
props.insert("int_val".to_string(), PropValue::Int(-42));
props.insert("uint_val".to_string(), PropValue::UInt(999));
props.insert("float_val".to_string(), PropValue::Float(3.14));
props.insert(
"string_val".to_string(),
PropValue::String("hello".to_string()),
);
props.insert("bytes_val".to_string(), PropValue::Bytes(vec![1, 2, 3]));
props.insert(
"array_val".to_string(),
PropValue::Array(vec![
PropValue::Int(1),
PropValue::String("two".to_string()),
]),
);
let op = WalOp::UpsertNode(NodeRecord {
id: 1,
label_ids: NodeLabelSet::single(1).unwrap(),
key: "test".to_string(),
props,
created_at: 0,
updated_at: 0,
weight: 1.0,
dense_vector: None,
sparse_vector: None,
last_write_seq: 0,
});
let encoded = encode_wal_op(&op).unwrap();
let decoded = decode_wal_op(&encoded).unwrap();
match decoded {
WalOp::UpsertNode(node) => {
assert_eq!(node.props.len(), 8);
assert_eq!(node.props.get("null_val"), Some(&PropValue::Null));
assert_eq!(node.props.get("bool_val"), Some(&PropValue::Bool(true)));
assert_eq!(node.props.get("int_val"), Some(&PropValue::Int(-42)));
assert_eq!(node.props.get("uint_val"), Some(&PropValue::UInt(999)));
if let Some(PropValue::Float(f)) = node.props.get("float_val") {
assert!((f - 3.14).abs() < f64::EPSILON);
} else {
panic!("expected Float");
}
assert_eq!(
node.props.get("string_val"),
Some(&PropValue::String("hello".to_string()))
);
assert_eq!(
node.props.get("bytes_val"),
Some(&PropValue::Bytes(vec![1, 2, 3]))
);
}
_ => panic!("expected UpsertNode"),
}
}
#[test]
fn test_invalid_op_tag() {
let data = vec![255u8, 0, 0, 0, 0, 0, 0, 0, 0];
let result = decode_wal_op(&data);
assert!(result.is_err());
}
#[test]
fn test_truncated_data() {
let data = vec![1u8, 0, 0]; let result = decode_wal_op(&data);
assert!(result.is_err());
}
#[test]
fn test_node_encoding_deterministic_size() {
let op = WalOp::DeleteNode {
id: 1,
deleted_at: 1000,
};
let encoded = encode_wal_op(&op).unwrap();
assert_eq!(encoded.len(), 17);
}
#[test]
fn test_string_key_overflow_rejected() {
let long_key = "x".repeat(65536);
let op = WalOp::UpsertNode(NodeRecord {
id: 1,
label_ids: NodeLabelSet::single(1).unwrap(),
key: long_key,
props: BTreeMap::new(),
created_at: 0,
updated_at: 0,
weight: 0.0,
dense_vector: None,
sparse_vector: None,
last_write_seq: 0,
});
let result = encode_wal_op(&op);
assert!(result.is_err());
let err_msg = format!("{}", result.unwrap_err());
assert!(err_msg.contains("string too long"));
}
#[test]
fn test_string_key_at_max_length_ok() {
let max_key = "x".repeat(u16::MAX as usize);
let op = WalOp::UpsertNode(NodeRecord {
id: 1,
label_ids: NodeLabelSet::single(1).unwrap(),
key: max_key.clone(),
props: BTreeMap::new(),
created_at: 0,
updated_at: 0,
weight: 0.0,
dense_vector: None,
sparse_vector: None,
last_write_seq: 0,
});
let encoded = encode_wal_op(&op).unwrap();
let decoded = decode_wal_op(&encoded).unwrap();
match decoded {
WalOp::UpsertNode(node) => assert_eq!(node.key.len(), u16::MAX as usize),
_ => panic!("expected UpsertNode"),
}
}
#[test]
fn test_encode_wal_op_into_reuses_buffer() {
let mut buf = Vec::new();
let op1 = WalOp::DeleteNode {
id: 1,
deleted_at: 1000,
};
encode_wal_op_into(&op1, &mut buf).unwrap();
let ptr1 = buf.as_ptr();
let cap1 = buf.capacity();
let op2 = WalOp::DeleteNode {
id: 2,
deleted_at: 2000,
};
encode_wal_op_into(&op2, &mut buf).unwrap();
assert_eq!(buf.as_ptr(), ptr1);
assert_eq!(buf.capacity(), cap1);
let decoded = decode_wal_op(&buf).unwrap();
match decoded {
WalOp::DeleteNode { id, .. } => assert_eq!(id, 2),
_ => panic!("expected DeleteNode"),
}
}
#[test]
fn test_read_bytes_rejects_oversized_length() {
let mut data = Vec::new();
let huge_len: u32 = 128 * 1024 * 1024;
data.extend_from_slice(&huge_len.to_le_bytes());
data.extend_from_slice(&[0u8; 16]); let mut cursor = Cursor::new(data.as_slice());
let result = read_bytes(&mut cursor);
assert!(result.is_err());
}
}