use super::compact_record::CompactEdgeRecord;
use crate::backend::native::{NativeBackendError, NativeResult};
pub fn serialize_cluster(
edges: &[CompactEdgeRecord],
serialized_size: usize,
) -> NativeResult<Vec<u8>> {
let expected_total_size = 8 + serialized_size;
let mut buffer = Vec::with_capacity(expected_total_size);
buffer.extend_from_slice(&(edges.len() as u32).to_be_bytes());
buffer.extend_from_slice(&(serialized_size as u32).to_be_bytes());
if std::env::var("V2_CLUSTER_AUDIT").is_ok() {
println!(
"[V2_CLUSTER_AUDIT] {}:serialize(): file:{} line={}, edge_count={}, payload_size={}, expected_total={}",
std::module_path!(),
file!(),
line!(),
edges.len(),
serialized_size,
expected_total_size
);
}
if !edges.is_empty() {
let mut cursor = 8;
for (edge_index, edge) in edges.iter().enumerate() {
if std::env::var("V2_CLUSTER_AUDIT").is_ok() {
println!(
"[V2_CLUSTER_AUDIT] {}:serialize(): file:{} line={}, edge_index={}, edge_size={}, cursor={}",
std::module_path!(),
file!(),
line!(),
edge_index,
edge.size_bytes(),
cursor
);
}
let edge_bytes = edge.serialize();
cursor += edge_bytes.len();
buffer.extend_from_slice(&edge_bytes);
}
if cursor != 8 + serialized_size {
return Err(NativeBackendError::CorruptNodeRecord {
node_id: -1,
reason: format!(
"serialize(): cursor mismatch: {}, expected {}",
cursor,
8 + serialized_size
),
});
}
}
if buffer.len() != expected_total_size {
return Err(NativeBackendError::CorruptNodeRecord {
node_id: -1,
reason: format!(
"serialize(): final buffer size mismatch: actual {}, expected {}",
buffer.len(),
expected_total_size
),
});
}
Ok(buffer)
}
pub fn verify_serialized_layout(bytes: &[u8]) -> NativeResult<()> {
if bytes.len() < 8 {
return Err(NativeBackendError::BufferTooSmall {
size: bytes.len(),
min_size: 8,
});
}
let edge_count = u32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]) as usize;
let payload_size = u32::from_be_bytes([bytes[4], bytes[5], bytes[6], bytes[7]]) as usize;
let expected_total = 8 + payload_size;
if bytes.len() != expected_total {
return Err(NativeBackendError::CorruptNodeRecord {
node_id: -1,
reason: format!(
"verify_serialized_layout(): size mismatch: expected {}, actual {}",
expected_total,
bytes.len()
),
});
}
let mut cursor = 8;
for edge_index in 0..edge_count {
if cursor + 8 > bytes.len() {
return Err(NativeBackendError::CorruptNodeRecord {
node_id: -1,
reason: format!(
"verify_serialized_layout(): truncated edge header at edge {}, cursor {}",
edge_index, cursor
),
});
}
let neighbor_id_bytes = &bytes[cursor..cursor + 8];
let _neighbor_id = i64::from_be_bytes(neighbor_id_bytes.try_into().unwrap());
cursor += 8;
cursor += 2;
let data_len_bytes = &bytes[cursor..cursor + 2];
let data_len = u16::from_be_bytes(data_len_bytes.try_into().unwrap()) as usize;
cursor += 2;
if data_len > 10000 {
return Err(NativeBackendError::CorruptNodeRecord {
node_id: -1,
reason: format!(
"verify_serialized_layout(): edge data too large: {} bytes at edge {}",
data_len, edge_index
),
});
}
cursor += data_len;
if cursor > bytes.len() {
return Err(NativeBackendError::CorruptNodeRecord {
node_id: -1,
reason: format!(
"verify_serialized_layout(): buffer overrun at edge {}, cursor {}",
edge_index, cursor
),
});
}
}
if cursor != expected_total {
return Err(NativeBackendError::CorruptNodeRecord {
node_id: -1,
reason: format!(
"verify_serialized_layout(): cursor mismatch: expected {}, actual {}",
expected_total, cursor
),
});
}
Ok(())
}
pub fn deserialize_cluster(bytes: &[u8]) -> NativeResult<(Vec<CompactEdgeRecord>, usize)> {
#[cfg(feature = "trace_v2_io")]
{
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
for byte in bytes {
byte.hash(&mut hasher);
}
let hash_val = hasher.finish();
println!(
"[V2_CLUSTER_AUDIT] {}:deserialize(): file:{} line={}, bytes_len={}, hash={:x}",
std::module_path!(),
file!(),
line!(),
bytes.len(),
hash_val
);
}
if bytes.len() < 8 {
return Err(NativeBackendError::BufferTooSmall {
size: bytes.len(),
min_size: 8,
});
}
let edge_count = u32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]) as usize;
let payload_size = u32::from_be_bytes([bytes[4], bytes[5], bytes[6], bytes[7]]) as usize;
let expected_total = 8 + payload_size;
if bytes.len() != expected_total {
return Err(NativeBackendError::CorruptNodeRecord {
node_id: -1,
reason: format!(
"deserialize(): SIZE_MISMATCH file={} line={} actual={}, expected={}, diff={}, payload_size_from_header={}",
file!(),
line!(),
bytes.len(),
expected_total,
bytes.len() as isize - expected_total as isize,
payload_size
),
});
}
let mut edges = Vec::with_capacity(edge_count);
let mut cursor = 8;
for edge_index in 0..edge_count {
if cursor > bytes.len() {
let remaining = bytes.len() - cursor;
return Err(NativeBackendError::CorruptNodeRecord {
node_id: -1,
reason: format!(
"deserialize(): edge_index={}, cursor={}, remaining={}",
edge_index, cursor, remaining
),
});
}
let record = match CompactEdgeRecord::deserialize(&bytes[cursor..]) {
Ok(rec) => rec,
Err(e) => {
return Err(NativeBackendError::CorruptNodeRecord {
node_id: -1,
reason: format!(
"deserialize(): edge_index={}, cursor={}, error={:?}, bytes={:02X?}",
edge_index,
cursor,
e,
&bytes[cursor..cursor.saturating_add(20)]
),
});
}
};
let next_cursor = cursor + record.size_bytes();
if next_cursor > bytes.len() {
let current_payload_size = payload_size;
let new_payload_size = (next_cursor - 8) as u32;
return Err(NativeBackendError::CorruptNodeRecord {
node_id: -1,
reason: format!(
"deserialize(): edge_index={}, cursor={}, edge_size={}, next_cursor={}, bytes_len={}, payload_size={} (current) -> {} (new)",
edge_index,
cursor,
record.size_bytes(),
next_cursor,
bytes.len(),
current_payload_size,
new_payload_size
),
});
}
cursor = next_cursor;
edges.push(record);
}
Ok((edges, payload_size))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::backend::native::v2::edge_cluster::cluster_trace::Direction;
use crate::backend::native::v2::string_table::StringTable;
use crate::backend::native::{EdgeFlags, EdgeRecord};
fn create_test_edge_record(from_id: i64, to_id: i64, edge_type: &str) -> EdgeRecord {
EdgeRecord {
id: 1,
from_id,
to_id,
edge_type: edge_type.to_string(),
flags: EdgeFlags::empty(),
data: serde_json::json!([1, 2, 3]),
}
}
#[test]
fn test_serialize_empty_cluster() {
let edges: Vec<CompactEdgeRecord> = vec![];
let result = serialize_cluster(&edges, 0);
assert!(result.is_ok());
let serialized = result.unwrap();
assert_eq!(serialized.len(), 8); assert_eq!(serialized[0..4], [0, 0, 0, 0]); assert_eq!(serialized[4..8], [0, 0, 0, 0]); }
#[test]
fn test_serialize_single_edge() {
let mut string_table = StringTable::new();
let edge_records = vec![create_test_edge_record(1, 2, "test")];
let compact_edges = edge_records
.iter()
.map(|e| {
CompactEdgeRecord::from_edge_record(e, Direction::Outgoing, &mut string_table)
.unwrap()
})
.collect::<Vec<_>>();
let serialized_size = compact_edges.iter().map(|c| c.size_bytes()).sum();
let result = serialize_cluster(&compact_edges, serialized_size);
assert!(result.is_ok());
let serialized = result.unwrap();
assert_eq!(serialized.len(), 8 + serialized_size);
assert_eq!(serialized[0..4], [0, 0, 0, 1]); }
#[test]
fn test_verify_valid_layout() {
let mut bytes = vec![0u8; 8];
bytes[0..4].copy_from_slice(&1u32.to_be_bytes()); bytes[4..8].copy_from_slice(&10u32.to_be_bytes());
assert!(verify_serialized_layout(&bytes).is_err());
}
#[test]
fn test_verify_empty_cluster() {
let bytes = vec![0u8; 8];
let result = verify_serialized_layout(&bytes);
assert!(result.is_ok());
}
#[test]
fn test_verify_truncated_header() {
let bytes = vec![1u8; 4];
let result = verify_serialized_layout(&bytes);
assert!(result.is_err());
}
#[test]
fn test_deserialize_empty_cluster() {
let bytes = vec![0u8; 8];
let result = deserialize_cluster(&bytes);
assert!(result.is_ok());
let (edges, payload_size) = result.unwrap();
assert_eq!(edges.len(), 0);
assert_eq!(payload_size, 0);
}
#[test]
fn test_round_trip_serialization() {
let mut string_table = StringTable::new();
let edge_records = vec![
create_test_edge_record(1, 2, "type1"),
create_test_edge_record(1, 3, "type2"),
];
let compact_edges = edge_records
.iter()
.map(|e| {
CompactEdgeRecord::from_edge_record(e, Direction::Outgoing, &mut string_table)
.unwrap()
})
.collect::<Vec<_>>();
let serialized_size = compact_edges.iter().map(|c| c.size_bytes()).sum();
let serialized = serialize_cluster(&compact_edges, serialized_size).unwrap();
assert!(verify_serialized_layout(&serialized).is_ok());
let (deserialized_edges, deserialized_size) = deserialize_cluster(&serialized).unwrap();
assert_eq!(deserialized_edges.len(), compact_edges.len());
assert_eq!(deserialized_size, serialized_size);
}
}