use crate::constants::*;
use crate::types::*;
use crate::util::binary::*;
use crate::util::crc::crc32c;
#[derive(Debug, Clone)]
pub struct WalRecord {
pub record_type: WalRecordType,
pub txid: TxId,
pub payload: Vec<u8>,
}
impl WalRecord {
pub fn new(record_type: WalRecordType, txid: TxId, payload: Vec<u8>) -> Self {
Self {
record_type,
txid,
payload,
}
}
pub fn estimated_size(&self) -> usize {
let header_size = WAL_RECORD_HEADER_SIZE;
let crc_size = 4;
let unpadded = header_size + self.payload.len() + crc_size;
align_up(unpadded, WAL_RECORD_ALIGNMENT)
}
pub fn build(&self) -> Vec<u8> {
let header_size = WAL_RECORD_HEADER_SIZE;
let crc_size = 4;
let unpadded = header_size + self.payload.len() + crc_size;
let pad_len = padding_for(unpadded, WAL_RECORD_ALIGNMENT);
let total_size = unpadded + pad_len;
let mut buffer = vec![0u8; total_size];
write_u32(&mut buffer, 0, unpadded as u32); buffer[4] = self.record_type as u8;
buffer[5] = 0; write_u16(&mut buffer, 6, 0); write_u64(&mut buffer, 8, self.txid);
write_u32(&mut buffer, 16, self.payload.len() as u32);
buffer[WAL_RECORD_HEADER_SIZE..WAL_RECORD_HEADER_SIZE + self.payload.len()]
.copy_from_slice(&self.payload);
let crc_start = 4; let crc_end = WAL_RECORD_HEADER_SIZE + self.payload.len();
let crc_value = crc32c(&buffer[crc_start..crc_end]);
write_u32(&mut buffer, crc_end, crc_value);
buffer
}
}
#[derive(Debug, Clone)]
pub struct ParsedWalRecord {
pub record_type: WalRecordType,
pub flags: u8,
pub txid: TxId,
pub payload: Vec<u8>,
pub record_end: usize, }
pub fn parse_wal_record(buffer: &[u8], offset: usize) -> Option<ParsedWalRecord> {
if offset + 4 > buffer.len() {
return None;
}
let rec_len = read_u32(buffer, offset) as usize;
if rec_len < WAL_RECORD_HEADER_SIZE + 4 {
return None; }
let pad_len = padding_for(rec_len, WAL_RECORD_ALIGNMENT);
let total_len = rec_len + pad_len;
if offset + total_len > buffer.len() {
return None; }
let record_type_byte = buffer[offset + 4];
let flags = buffer[offset + 5];
let txid = read_u64(buffer, offset + 8);
let payload_len = read_u32(buffer, offset + 16) as usize;
if WAL_RECORD_HEADER_SIZE + payload_len + 4 != rec_len {
return None;
}
let payload_start = offset + WAL_RECORD_HEADER_SIZE;
let payload = buffer[payload_start..payload_start + payload_len].to_vec();
let crc_start = offset + 4;
let crc_end = offset + WAL_RECORD_HEADER_SIZE + payload_len;
let stored_crc = read_u32(buffer, crc_end);
let computed_crc = crc32c(&buffer[crc_start..crc_end]);
if stored_crc != computed_crc {
return None; }
let record_type = WalRecordType::from_u8(record_type_byte)?;
Some(ParsedWalRecord {
record_type,
flags,
txid,
payload,
record_end: offset + total_len,
})
}
pub fn scan_wal(buffer: &[u8]) -> Vec<ParsedWalRecord> {
let mut records = Vec::new();
let mut offset = 0;
while offset < buffer.len() {
match parse_wal_record(buffer, offset) {
Some(record) => {
offset = record.record_end;
records.push(record);
}
None => break, }
}
records
}
pub fn extract_committed_transactions(
records: &[ParsedWalRecord],
) -> std::collections::HashMap<TxId, Vec<&ParsedWalRecord>> {
use std::collections::HashMap;
let mut pending: HashMap<TxId, Vec<&ParsedWalRecord>> = HashMap::new();
let mut committed: HashMap<TxId, Vec<&ParsedWalRecord>> = HashMap::new();
for record in records {
let txid = record.txid;
match record.record_type {
WalRecordType::Begin => {
pending.insert(txid, Vec::new());
}
WalRecordType::Commit => {
if let Some(tx_records) = pending.remove(&txid) {
committed.insert(txid, tx_records);
}
}
WalRecordType::Rollback => {
pending.remove(&txid);
}
_ => {
if let Some(tx_pending) = pending.get_mut(&txid) {
tx_pending.push(record);
}
}
}
}
committed
}
pub fn build_begin_payload() -> Vec<u8> {
Vec::new()
}
pub fn build_commit_payload() -> Vec<u8> {
Vec::new()
}
pub fn build_rollback_payload() -> Vec<u8> {
Vec::new()
}
pub fn build_create_node_payload(node_id: NodeId, key: Option<&str>) -> Vec<u8> {
let key_bytes = key.map(|k| k.as_bytes()).unwrap_or(&[]);
let mut buffer = vec![0u8; 8 + 4 + key_bytes.len()];
write_u64(&mut buffer, 0, node_id);
write_u32(&mut buffer, 8, key_bytes.len() as u32);
buffer[12..12 + key_bytes.len()].copy_from_slice(key_bytes);
buffer
}
pub fn build_create_nodes_batch_payload(entries: &[(NodeId, Option<&str>)]) -> Vec<u8> {
let mut total_len = 4;
for (_, key) in entries.iter() {
let key_len = key.map(|k| k.len()).unwrap_or(0);
total_len += 8 + 4 + key_len;
}
let mut buffer = vec![0u8; total_len];
write_u32(&mut buffer, 0, entries.len() as u32);
let mut offset = 4;
for (node_id, key) in entries.iter() {
write_u64(&mut buffer, offset, *node_id);
offset += 8;
let key_bytes = key.map(|k| k.as_bytes()).unwrap_or(&[]);
write_u32(&mut buffer, offset, key_bytes.len() as u32);
offset += 4;
if !key_bytes.is_empty() {
buffer[offset..offset + key_bytes.len()].copy_from_slice(key_bytes);
offset += key_bytes.len();
}
}
buffer
}
pub fn build_delete_node_payload(node_id: NodeId) -> Vec<u8> {
let mut buffer = vec![0u8; 8];
write_u64(&mut buffer, 0, node_id);
buffer
}
pub fn build_add_edge_payload(src: NodeId, etype: ETypeId, dst: NodeId) -> Vec<u8> {
let mut buffer = vec![0u8; 8 + 4 + 8];
write_u64(&mut buffer, 0, src);
write_u32(&mut buffer, 8, etype);
write_u64(&mut buffer, 12, dst);
buffer
}
pub fn build_add_edges_batch_payload(edges: &[(NodeId, ETypeId, NodeId)]) -> Vec<u8> {
let total_len = 4 + edges.len() * (8 + 4 + 8);
let mut buffer = vec![0u8; total_len];
write_u32(&mut buffer, 0, edges.len() as u32);
let mut offset = 4;
for (src, etype, dst) in edges.iter() {
write_u64(&mut buffer, offset, *src);
offset += 8;
write_u32(&mut buffer, offset, *etype);
offset += 4;
write_u64(&mut buffer, offset, *dst);
offset += 8;
}
buffer
}
pub fn build_add_edge_props_payload(
src: NodeId,
etype: ETypeId,
dst: NodeId,
props: &[(PropKeyId, PropValue)],
) -> Vec<u8> {
let mut total_len = 8 + 4 + 8 + 4; for (_, value) in props.iter() {
total_len += 4 + prop_value_serialized_len(value);
}
let mut buffer = vec![0u8; total_len];
write_u64(&mut buffer, 0, src);
write_u32(&mut buffer, 8, etype);
write_u64(&mut buffer, 12, dst);
write_u32(&mut buffer, 20, props.len() as u32);
let mut offset = 24;
for (key_id, value) in props.iter() {
write_u32(&mut buffer, offset, *key_id);
offset += 4;
write_prop_value(value, &mut buffer[offset..]);
offset += prop_value_serialized_len(value);
}
buffer
}
pub fn build_add_edges_props_batch_payload(edges: &[EdgeWithProps]) -> Vec<u8> {
let mut total_len = 4;
for (_, _, _, props) in edges.iter() {
total_len += 8 + 4 + 8 + 4;
for (_, value) in props.iter() {
total_len += 4 + prop_value_serialized_len(value);
}
}
let mut buffer = vec![0u8; total_len];
write_u32(&mut buffer, 0, edges.len() as u32);
let mut offset = 4;
for (src, etype, dst, props) in edges.iter() {
write_u64(&mut buffer, offset, *src);
offset += 8;
write_u32(&mut buffer, offset, *etype);
offset += 4;
write_u64(&mut buffer, offset, *dst);
offset += 8;
write_u32(&mut buffer, offset, props.len() as u32);
offset += 4;
for (key_id, value) in props.iter() {
write_u32(&mut buffer, offset, *key_id);
offset += 4;
write_prop_value(value, &mut buffer[offset..]);
offset += prop_value_serialized_len(value);
}
}
buffer
}
pub fn build_delete_edge_payload(src: NodeId, etype: ETypeId, dst: NodeId) -> Vec<u8> {
build_add_edge_payload(src, etype, dst)
}
pub fn build_define_label_payload(label_id: LabelId, name: &str) -> Vec<u8> {
let name_bytes = name.as_bytes();
let mut buffer = vec![0u8; 4 + 4 + name_bytes.len()];
write_u32(&mut buffer, 0, label_id);
write_u32(&mut buffer, 4, name_bytes.len() as u32);
buffer[8..8 + name_bytes.len()].copy_from_slice(name_bytes);
buffer
}
pub fn build_define_etype_payload(etype_id: ETypeId, name: &str) -> Vec<u8> {
build_define_label_payload(etype_id, name)
}
pub fn build_define_propkey_payload(propkey_id: PropKeyId, name: &str) -> Vec<u8> {
build_define_label_payload(propkey_id, name)
}
fn prop_value_serialized_len(value: &PropValue) -> usize {
match value {
PropValue::Null => 1,
PropValue::Bool(_) => 2,
PropValue::I64(_) | PropValue::F64(_) => 9,
PropValue::String(s) => 1 + 4 + s.len(),
PropValue::VectorF32(v) => 1 + 4 + v.len() * 4,
}
}
fn write_prop_value(value: &PropValue, buffer: &mut [u8]) {
match value {
PropValue::Null => {
buffer[0] = 0;
}
PropValue::Bool(v) => {
buffer[0] = 1;
buffer[1] = if *v { 1 } else { 0 };
}
PropValue::I64(v) => {
buffer[0] = 2;
write_i64(buffer, 1, *v);
}
PropValue::F64(v) => {
buffer[0] = 3;
write_f64(buffer, 1, *v);
}
PropValue::String(s) => {
let str_bytes = s.as_bytes();
buffer[0] = 4;
write_u32(buffer, 1, str_bytes.len() as u32);
buffer[5..5 + str_bytes.len()].copy_from_slice(str_bytes);
}
PropValue::VectorF32(v) => {
buffer[0] = 5;
write_u32(buffer, 1, v.len() as u32);
for (i, val) in v.iter().enumerate() {
let bytes = val.to_le_bytes();
buffer[5 + i * 4..5 + i * 4 + 4].copy_from_slice(&bytes);
}
}
}
}
fn serialize_prop_value(value: &PropValue) -> Vec<u8> {
let mut buf = vec![0u8; prop_value_serialized_len(value)];
write_prop_value(value, &mut buf);
buf
}
pub fn build_set_node_prop_payload(
node_id: NodeId,
key_id: PropKeyId,
value: &PropValue,
) -> Vec<u8> {
let value_len = prop_value_serialized_len(value);
let mut buffer = vec![0u8; 8 + 4 + value_len];
write_u64(&mut buffer, 0, node_id);
write_u32(&mut buffer, 8, key_id);
write_prop_value(value, &mut buffer[12..]);
buffer
}
pub fn build_del_node_prop_payload(node_id: NodeId, key_id: PropKeyId) -> Vec<u8> {
let mut buffer = vec![0u8; 8 + 4];
write_u64(&mut buffer, 0, node_id);
write_u32(&mut buffer, 8, key_id);
buffer
}
pub fn build_set_edge_prop_payload(
src: NodeId,
etype: ETypeId,
dst: NodeId,
key_id: PropKeyId,
value: &PropValue,
) -> Vec<u8> {
let value_len = prop_value_serialized_len(value);
let mut buffer = vec![0u8; 8 + 4 + 8 + 4 + value_len];
write_u64(&mut buffer, 0, src);
write_u32(&mut buffer, 8, etype);
write_u64(&mut buffer, 12, dst);
write_u32(&mut buffer, 20, key_id);
write_prop_value(value, &mut buffer[24..]);
buffer
}
pub fn build_set_edge_props_payload(
src: NodeId,
etype: ETypeId,
dst: NodeId,
props: &[(PropKeyId, PropValue)],
) -> Vec<u8> {
let mut total_len = 8 + 4 + 8 + 4; for (_, value) in props.iter() {
total_len += 4 + prop_value_serialized_len(value);
}
let mut buffer = vec![0u8; total_len];
write_u64(&mut buffer, 0, src);
write_u32(&mut buffer, 8, etype);
write_u64(&mut buffer, 12, dst);
write_u32(&mut buffer, 20, props.len() as u32);
let mut offset = 24;
for (key_id, value) in props.iter() {
write_u32(&mut buffer, offset, *key_id);
offset += 4;
write_prop_value(value, &mut buffer[offset..]);
offset += prop_value_serialized_len(value);
}
buffer
}
pub fn build_del_edge_prop_payload(
src: NodeId,
etype: ETypeId,
dst: NodeId,
key_id: PropKeyId,
) -> Vec<u8> {
let mut buffer = vec![0u8; 8 + 4 + 8 + 4];
write_u64(&mut buffer, 0, src);
write_u32(&mut buffer, 8, etype);
write_u64(&mut buffer, 12, dst);
write_u32(&mut buffer, 20, key_id);
buffer
}
pub fn build_set_node_vector_payload(
node_id: NodeId,
prop_key_id: PropKeyId,
vector: &[f32],
) -> Vec<u8> {
let dimensions = vector.len();
let mut buffer = vec![0u8; 8 + 4 + 4 + dimensions * 4];
write_u64(&mut buffer, 0, node_id);
write_u32(&mut buffer, 8, prop_key_id);
write_u32(&mut buffer, 12, dimensions as u32);
for (i, val) in vector.iter().enumerate() {
let bytes = val.to_le_bytes();
buffer[16 + i * 4..16 + i * 4 + 4].copy_from_slice(&bytes);
}
buffer
}
pub fn build_del_node_vector_payload(node_id: NodeId, prop_key_id: PropKeyId) -> Vec<u8> {
let mut buffer = vec![0u8; 8 + 4];
write_u64(&mut buffer, 0, node_id);
write_u32(&mut buffer, 8, prop_key_id);
buffer
}
pub fn build_add_node_label_payload(node_id: NodeId, label_id: LabelId) -> Vec<u8> {
let mut buffer = vec![0u8; 8 + 4];
write_u64(&mut buffer, 0, node_id);
write_u32(&mut buffer, 8, label_id);
buffer
}
pub fn build_remove_node_label_payload(node_id: NodeId, label_id: LabelId) -> Vec<u8> {
build_add_node_label_payload(node_id, label_id)
}
#[derive(Debug, Clone)]
pub struct CreateNodeData {
pub node_id: NodeId,
pub key: Option<String>,
}
pub fn parse_create_node_payload(payload: &[u8]) -> Option<CreateNodeData> {
if payload.len() < 12 {
return None;
}
let node_id = read_u64(payload, 0);
let key_len = read_u32(payload, 8) as usize;
let key = if key_len > 0 && payload.len() >= 12 + key_len {
String::from_utf8(payload[12..12 + key_len].to_vec()).ok()
} else {
None
};
Some(CreateNodeData { node_id, key })
}
pub fn parse_create_nodes_batch_payload(payload: &[u8]) -> Option<Vec<CreateNodeData>> {
if payload.len() < 4 {
return None;
}
let count = read_u32(payload, 0) as usize;
let mut nodes = Vec::with_capacity(count);
let mut offset = 4;
for _ in 0..count {
if offset + 12 > payload.len() {
return None;
}
let node_id = read_u64(payload, offset);
offset += 8;
let key_len = read_u32(payload, offset) as usize;
offset += 4;
if offset + key_len > payload.len() {
return None;
}
let key = if key_len > 0 {
let key_bytes = payload[offset..offset + key_len].to_vec();
offset += key_len;
String::from_utf8(key_bytes).ok()
} else {
None
};
nodes.push(CreateNodeData { node_id, key });
}
Some(nodes)
}
#[derive(Debug, Clone)]
pub struct DeleteNodeData {
pub node_id: NodeId,
}
pub fn parse_delete_node_payload(payload: &[u8]) -> Option<DeleteNodeData> {
if payload.len() < 8 {
return None;
}
Some(DeleteNodeData {
node_id: read_u64(payload, 0),
})
}
#[derive(Debug, Clone)]
pub struct AddEdgeData {
pub src: NodeId,
pub etype: ETypeId,
pub dst: NodeId,
}
pub fn parse_add_edge_payload(payload: &[u8]) -> Option<AddEdgeData> {
if payload.len() < 20 {
return None;
}
Some(AddEdgeData {
src: read_u64(payload, 0),
etype: read_u32(payload, 8),
dst: read_u64(payload, 12),
})
}
pub fn parse_add_edges_batch_payload(payload: &[u8]) -> Option<Vec<AddEdgeData>> {
if payload.len() < 4 {
return None;
}
let count = read_u32(payload, 0) as usize;
let mut edges = Vec::with_capacity(count);
let mut offset = 4;
for _ in 0..count {
if offset + 20 > payload.len() {
return None;
}
let src = read_u64(payload, offset);
offset += 8;
let etype = read_u32(payload, offset);
offset += 4;
let dst = read_u64(payload, offset);
offset += 8;
edges.push(AddEdgeData { src, etype, dst });
}
Some(edges)
}
#[derive(Debug, Clone)]
pub struct AddEdgePropsData {
pub src: NodeId,
pub etype: ETypeId,
pub dst: NodeId,
pub props: Vec<(PropKeyId, PropValue)>,
}
pub fn parse_add_edge_props_payload(payload: &[u8]) -> Option<AddEdgePropsData> {
if payload.len() < 24 {
return None;
}
let src = read_u64(payload, 0);
let etype = read_u32(payload, 8);
let dst = read_u64(payload, 12);
let count = read_u32(payload, 20) as usize;
let mut props = Vec::with_capacity(count);
let mut offset = 24;
for _ in 0..count {
if offset + 4 > payload.len() {
return None;
}
let key_id = read_u32(payload, offset);
offset += 4;
let (value, consumed) = parse_prop_value(payload, offset)?;
offset += consumed;
props.push((key_id, value));
}
Some(AddEdgePropsData {
src,
etype,
dst,
props,
})
}
pub fn parse_add_edges_props_batch_payload(payload: &[u8]) -> Option<Vec<AddEdgePropsData>> {
if payload.len() < 4 {
return None;
}
let count = read_u32(payload, 0) as usize;
let mut edges = Vec::with_capacity(count);
let mut offset = 4;
for _ in 0..count {
if offset + 24 > payload.len() {
return None;
}
let src = read_u64(payload, offset);
offset += 8;
let etype = read_u32(payload, offset);
offset += 4;
let dst = read_u64(payload, offset);
offset += 8;
let prop_count = read_u32(payload, offset) as usize;
offset += 4;
let mut props = Vec::with_capacity(prop_count);
for _ in 0..prop_count {
if offset + 4 > payload.len() {
return None;
}
let key_id = read_u32(payload, offset);
offset += 4;
let (value, consumed) = parse_prop_value(payload, offset)?;
offset += consumed;
props.push((key_id, value));
}
edges.push(AddEdgePropsData {
src,
etype,
dst,
props,
});
}
Some(edges)
}
pub fn parse_delete_edge_payload(payload: &[u8]) -> Option<AddEdgeData> {
parse_add_edge_payload(payload)
}
#[derive(Debug, Clone)]
pub struct DefineLabelData {
pub label_id: LabelId,
pub name: String,
}
pub fn parse_define_label_payload(payload: &[u8]) -> Option<DefineLabelData> {
if payload.len() < 8 {
return None;
}
let label_id = read_u32(payload, 0);
let name_len = read_u32(payload, 4) as usize;
if payload.len() < 8 + name_len {
return None;
}
let name = String::from_utf8(payload[8..8 + name_len].to_vec()).ok()?;
Some(DefineLabelData { label_id, name })
}
pub fn parse_define_etype_payload(payload: &[u8]) -> Option<DefineLabelData> {
parse_define_label_payload(payload)
}
pub fn parse_define_propkey_payload(payload: &[u8]) -> Option<DefineLabelData> {
parse_define_label_payload(payload)
}
#[derive(Debug, Clone)]
pub struct AddNodeLabelData {
pub node_id: NodeId,
pub label_id: LabelId,
}
pub fn parse_add_node_label_payload(payload: &[u8]) -> Option<AddNodeLabelData> {
if payload.len() < 12 {
return None;
}
Some(AddNodeLabelData {
node_id: read_u64(payload, 0),
label_id: read_u32(payload, 8),
})
}
pub fn parse_remove_node_label_payload(payload: &[u8]) -> Option<AddNodeLabelData> {
parse_add_node_label_payload(payload)
}
fn parse_prop_value(payload: &[u8], offset: usize) -> Option<(PropValue, usize)> {
if offset >= payload.len() {
return None;
}
let tag = payload[offset];
match PropValueTag::from_u8(tag)? {
PropValueTag::Null => Some((PropValue::Null, 1)),
PropValueTag::Bool => {
if offset + 2 > payload.len() {
return None;
}
Some((PropValue::Bool(payload[offset + 1] != 0), 2))
}
PropValueTag::I64 => {
if offset + 9 > payload.len() {
return None;
}
Some((PropValue::I64(read_i64(payload, offset + 1)), 9))
}
PropValueTag::F64 => {
if offset + 9 > payload.len() {
return None;
}
Some((PropValue::F64(read_f64(payload, offset + 1)), 9))
}
PropValueTag::String => {
if offset + 5 > payload.len() {
return None;
}
let str_len = read_u32(payload, offset + 1) as usize;
if offset + 5 + str_len > payload.len() {
return None;
}
let s = String::from_utf8(payload[offset + 5..offset + 5 + str_len].to_vec()).ok()?;
Some((PropValue::String(s), 5 + str_len))
}
PropValueTag::VectorF32 => {
if offset + 5 > payload.len() {
return None;
}
let dimensions = read_u32(payload, offset + 1) as usize;
if offset + 5 + dimensions * 4 > payload.len() {
return None;
}
let mut vector = Vec::with_capacity(dimensions);
for i in 0..dimensions {
let bytes = [
payload[offset + 5 + i * 4],
payload[offset + 5 + i * 4 + 1],
payload[offset + 5 + i * 4 + 2],
payload[offset + 5 + i * 4 + 3],
];
vector.push(f32::from_le_bytes(bytes));
}
Some((PropValue::VectorF32(vector), 5 + dimensions * 4))
}
}
}
#[derive(Debug, Clone)]
pub struct SetNodePropData {
pub node_id: NodeId,
pub key_id: PropKeyId,
pub value: PropValue,
}
pub fn parse_set_node_prop_payload(payload: &[u8]) -> Option<SetNodePropData> {
if payload.len() < 12 {
return None;
}
let node_id = read_u64(payload, 0);
let key_id = read_u32(payload, 8);
let (value, _) = parse_prop_value(payload, 12)?;
Some(SetNodePropData {
node_id,
key_id,
value,
})
}
#[derive(Debug, Clone)]
pub struct DelNodePropData {
pub node_id: NodeId,
pub key_id: PropKeyId,
}
pub fn parse_del_node_prop_payload(payload: &[u8]) -> Option<DelNodePropData> {
if payload.len() < 12 {
return None;
}
Some(DelNodePropData {
node_id: read_u64(payload, 0),
key_id: read_u32(payload, 8),
})
}
#[derive(Debug, Clone)]
pub struct SetNodeVectorData {
pub node_id: NodeId,
pub prop_key_id: PropKeyId,
pub dimensions: usize,
pub vector: Vec<f32>,
}
pub fn parse_set_node_vector_payload(payload: &[u8]) -> Option<SetNodeVectorData> {
if payload.len() < 16 {
return None;
}
let node_id = read_u64(payload, 0);
let prop_key_id = read_u32(payload, 8);
let dimensions = read_u32(payload, 12) as usize;
if payload.len() < 16 + dimensions * 4 {
return None;
}
let mut vector = Vec::with_capacity(dimensions);
for i in 0..dimensions {
let bytes = [
payload[16 + i * 4],
payload[16 + i * 4 + 1],
payload[16 + i * 4 + 2],
payload[16 + i * 4 + 3],
];
vector.push(f32::from_le_bytes(bytes));
}
Some(SetNodeVectorData {
node_id,
prop_key_id,
dimensions,
vector,
})
}
#[derive(Debug, Clone)]
pub struct DelNodeVectorData {
pub node_id: NodeId,
pub prop_key_id: PropKeyId,
}
pub fn parse_del_node_vector_payload(payload: &[u8]) -> Option<DelNodeVectorData> {
if payload.len() < 12 {
return None;
}
Some(DelNodeVectorData {
node_id: read_u64(payload, 0),
prop_key_id: read_u32(payload, 8),
})
}
#[derive(Debug, Clone)]
pub struct SetEdgePropData {
pub src: NodeId,
pub etype: ETypeId,
pub dst: NodeId,
pub key_id: PropKeyId,
pub value: PropValue,
}
pub fn parse_set_edge_prop_payload(payload: &[u8]) -> Option<SetEdgePropData> {
if payload.len() < 24 {
return None;
}
let src = read_u64(payload, 0);
let etype = read_u32(payload, 8);
let dst = read_u64(payload, 12);
let key_id = read_u32(payload, 20);
let (value, _) = parse_prop_value(payload, 24)?;
Some(SetEdgePropData {
src,
etype,
dst,
key_id,
value,
})
}
#[derive(Debug, Clone)]
pub struct SetEdgePropsData {
pub src: NodeId,
pub etype: ETypeId,
pub dst: NodeId,
pub props: Vec<(PropKeyId, PropValue)>,
}
pub fn parse_set_edge_props_payload(payload: &[u8]) -> Option<SetEdgePropsData> {
if payload.len() < 24 {
return None;
}
let src = read_u64(payload, 0);
let etype = read_u32(payload, 8);
let dst = read_u64(payload, 12);
let count = read_u32(payload, 20) as usize;
let mut props = Vec::with_capacity(count);
let mut offset = 24;
for _ in 0..count {
if offset + 4 > payload.len() {
return None;
}
let key_id = read_u32(payload, offset);
offset += 4;
let (value, consumed) = parse_prop_value(payload, offset)?;
offset += consumed;
props.push((key_id, value));
}
Some(SetEdgePropsData {
src,
etype,
dst,
props,
})
}
#[derive(Debug, Clone)]
pub struct DelEdgePropData {
pub src: NodeId,
pub etype: ETypeId,
pub dst: NodeId,
pub key_id: PropKeyId,
}
pub fn parse_del_edge_prop_payload(payload: &[u8]) -> Option<DelEdgePropData> {
if payload.len() < 24 {
return None;
}
Some(DelEdgePropData {
src: read_u64(payload, 0),
etype: read_u32(payload, 8),
dst: read_u64(payload, 12),
key_id: read_u32(payload, 20),
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_wal_record_roundtrip() {
let record = WalRecord::new(
WalRecordType::CreateNode,
42,
build_create_node_payload(123, Some("test_key")),
);
let bytes = record.build();
let parsed = parse_wal_record(&bytes, 0).expect("expected value");
assert_eq!(parsed.record_type, WalRecordType::CreateNode);
assert_eq!(parsed.txid, 42);
let data = parse_create_node_payload(&parsed.payload).expect("expected value");
assert_eq!(data.node_id, 123);
assert_eq!(data.key, Some("test_key".to_string()));
}
#[test]
fn test_edge_payload() {
let payload = build_add_edge_payload(1, 100, 2);
let data = parse_add_edge_payload(&payload).expect("expected value");
assert_eq!(data.src, 1);
assert_eq!(data.etype, 100);
assert_eq!(data.dst, 2);
}
#[test]
fn test_create_nodes_batch_payload() {
let entries = vec![(1, Some("a")), (2, None), (3, Some("ccc"))];
let payload = build_create_nodes_batch_payload(&entries);
let data = parse_create_nodes_batch_payload(&payload).expect("expected value");
assert_eq!(data.len(), 3);
assert_eq!(data[0].node_id, 1);
assert_eq!(data[0].key, Some("a".to_string()));
assert_eq!(data[1].node_id, 2);
assert_eq!(data[1].key, None);
assert_eq!(data[2].node_id, 3);
assert_eq!(data[2].key, Some("ccc".to_string()));
}
#[test]
fn test_add_edges_batch_payload() {
let edges = vec![(1, 10, 2), (2, 11, 3)];
let payload = build_add_edges_batch_payload(&edges);
let data = parse_add_edges_batch_payload(&payload).expect("expected value");
assert_eq!(data.len(), 2);
assert_eq!(data[0].src, 1);
assert_eq!(data[0].etype, 10);
assert_eq!(data[0].dst, 2);
assert_eq!(data[1].src, 2);
assert_eq!(data[1].etype, 11);
assert_eq!(data[1].dst, 3);
}
#[test]
fn test_add_edges_props_batch_payload() {
let edges = vec![
(1, 2, 3, vec![(10, PropValue::I64(7))]),
(
4,
5,
6,
vec![
(11, PropValue::String("v".into())),
(12, PropValue::Bool(true)),
],
),
];
let payload = build_add_edges_props_batch_payload(&edges);
let data = parse_add_edges_props_batch_payload(&payload).expect("expected value");
assert_eq!(data.len(), 2);
assert_eq!(data[0].src, 1);
assert_eq!(data[0].etype, 2);
assert_eq!(data[0].dst, 3);
assert_eq!(data[0].props.len(), 1);
assert_eq!(data[0].props[0].0, 10);
assert_eq!(data[0].props[0].1, PropValue::I64(7));
assert_eq!(data[1].src, 4);
assert_eq!(data[1].etype, 5);
assert_eq!(data[1].dst, 6);
assert_eq!(data[1].props.len(), 2);
assert_eq!(data[1].props[0].0, 11);
assert_eq!(data[1].props[0].1, PropValue::String("v".into()));
assert_eq!(data[1].props[1].0, 12);
assert_eq!(data[1].props[1].1, PropValue::Bool(true));
}
#[test]
fn test_add_edge_props_payload() {
let props = vec![(10, PropValue::I64(7)), (11, PropValue::String("v".into()))];
let payload = build_add_edge_props_payload(1, 2, 3, &props);
let data = parse_add_edge_props_payload(&payload).expect("expected value");
assert_eq!(data.src, 1);
assert_eq!(data.etype, 2);
assert_eq!(data.dst, 3);
assert_eq!(data.props.len(), 2);
assert_eq!(data.props[0].0, 10);
assert_eq!(data.props[0].1, PropValue::I64(7));
assert_eq!(data.props[1].0, 11);
assert_eq!(data.props[1].1, PropValue::String("v".into()));
}
#[test]
fn test_set_edge_props_payload() {
let props = vec![
(10, PropValue::I64(42)),
(11, PropValue::String("ok".into())),
];
let payload = build_set_edge_props_payload(1, 2, 3, &props);
let data = parse_set_edge_props_payload(&payload).expect("expected value");
assert_eq!(data.src, 1);
assert_eq!(data.etype, 2);
assert_eq!(data.dst, 3);
assert_eq!(data.props.len(), 2);
assert_eq!(data.props[0].0, 10);
assert_eq!(data.props[0].1, PropValue::I64(42));
assert_eq!(data.props[1].0, 11);
assert_eq!(data.props[1].1, PropValue::String("ok".into()));
}
#[test]
fn test_prop_value_string() {
let value = PropValue::String("hello world".to_string());
let payload = build_set_node_prop_payload(42, 5, &value);
let data = parse_set_node_prop_payload(&payload).expect("expected value");
assert_eq!(data.node_id, 42);
assert_eq!(data.key_id, 5);
assert_eq!(data.value, PropValue::String("hello world".to_string()));
}
#[test]
fn test_prop_value_i64() {
let value = PropValue::I64(-12345);
let payload = build_set_node_prop_payload(1, 2, &value);
let data = parse_set_node_prop_payload(&payload).expect("expected value");
assert_eq!(data.value, PropValue::I64(-12345));
}
#[test]
fn test_vector_payload() {
let vector = vec![1.0, 2.0, 3.0, 4.0];
let payload = build_set_node_vector_payload(100, 10, &vector);
let data = parse_set_node_vector_payload(&payload).expect("expected value");
assert_eq!(data.node_id, 100);
assert_eq!(data.prop_key_id, 10);
assert_eq!(data.dimensions, 4);
assert_eq!(data.vector, vector);
}
}