use crate::backend::native::graph_file::GraphFile;
use crate::backend::native::types::{NativeBackendError, NativeNodeId, NativeResult};
use crate::backend::native::v2::edge_cluster::cluster::EdgeCluster;
use std::time::Instant;
#[derive(Debug, Clone, Default)]
pub struct SequentialClusterReaderMetrics {
pub read_time_ns: u64,
pub total_bytes_read: u64,
pub clusters_read: u32,
pub extract_time_ns: u64,
pub extract_count: u32,
}
impl SequentialClusterReaderMetrics {
#[inline]
pub fn new() -> Self {
Self::default()
}
#[inline]
pub fn read_time_ms(&self) -> f64 {
self.read_time_ns as f64 / 1_000_000.0
}
#[inline]
pub fn extract_time_ms(&self) -> f64 {
self.extract_time_ns as f64 / 1_000_000.0
}
}
const MAX_CLUSTER_BUFFER_SIZE: usize = 512 * 1024;
pub struct SequentialClusterReader {
pub metrics: SequentialClusterReaderMetrics,
}
impl SequentialClusterReader {
#[inline]
pub fn new() -> Self {
Self {
metrics: SequentialClusterReaderMetrics::new(),
}
}
pub fn read_chain_clusters(
&mut self,
graph_file: &mut GraphFile,
cluster_offsets: &[(u64, u32)],
) -> NativeResult<Vec<u8>> {
let start = Instant::now();
if cluster_offsets.is_empty() {
return Err(NativeBackendError::InvalidParameter {
context: "cluster_offsets is empty".to_string(),
source: None,
});
}
let total_size: u64 = cluster_offsets.iter().map(|(_, size)| *size as u64).sum();
if total_size > MAX_CLUSTER_BUFFER_SIZE as u64 {
return Err(NativeBackendError::RecordTooLarge {
size: total_size as u32,
max_size: MAX_CLUSTER_BUFFER_SIZE as u32,
});
}
let start_offset = cluster_offsets[0].0;
let mut buffer = vec![0u8; total_size as usize];
graph_file.read_bytes(start_offset, &mut buffer)?;
self.metrics.read_time_ns += start.elapsed().as_nanos() as u64;
self.metrics.total_bytes_read += total_size;
self.metrics.clusters_read += cluster_offsets.len() as u32;
Ok(buffer)
}
pub fn extract_neighbors(
&mut self,
buffer: &[u8],
cluster_index: usize,
cluster_offsets: &[(u64, u32)],
) -> NativeResult<Vec<NativeNodeId>> {
let start = Instant::now();
if cluster_index >= cluster_offsets.len() {
return Err(NativeBackendError::InvalidParameter {
context: format!(
"cluster_index {} out of bounds (len: {})",
cluster_index,
cluster_offsets.len()
),
source: None,
});
}
let mut byte_offset = 0usize;
for (i, (_, size)) in cluster_offsets.iter().enumerate() {
if i == cluster_index {
break;
}
byte_offset += *size as usize;
}
let cluster_size = cluster_offsets[cluster_index].1 as usize;
let cluster_bytes = &buffer[byte_offset..byte_offset + cluster_size];
let cluster = EdgeCluster::deserialize(cluster_bytes)?;
let neighbors: Vec<NativeNodeId> = cluster
.iter_neighbors()
.map(|id| id as NativeNodeId)
.collect();
self.metrics.extract_time_ns += start.elapsed().as_nanos() as u64;
self.metrics.extract_count += 1;
Ok(neighbors)
}
}
impl Default for SequentialClusterReader {
#[inline]
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::backend::native::EdgeRecord;
use crate::backend::native::types::EdgeFlags;
use crate::backend::native::v2::edge_cluster::cluster::EdgeCluster;
use crate::backend::native::v2::edge_cluster::cluster_trace::Direction;
use crate::backend::native::v2::string_table::StringTable;
struct MockGraphFile {
data: Vec<u8>,
}
impl MockGraphFile {
fn new(data: Vec<u8>) -> Self {
Self { data }
}
fn read_bytes(&mut self, offset: u64, buffer: &mut [u8]) -> NativeResult<()> {
let start = offset as usize;
let end = start + buffer.len();
if end > self.data.len() {
return Err(NativeBackendError::Io(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"Read past end of mock data",
)));
}
buffer.copy_from_slice(&self.data[start..end]);
Ok(())
}
}
fn create_test_cluster(neighbors: &[i64]) -> Vec<u8> {
let mut string_table = StringTable::new();
let edges: Vec<EdgeRecord> = neighbors
.iter()
.enumerate()
.map(|(idx, &id)| EdgeRecord {
id: idx as i64,
from_id: 1,
to_id: id,
edge_type: "TEST".to_string(),
data: serde_json::Value::Null,
flags: EdgeFlags::empty(),
})
.collect();
let cluster =
EdgeCluster::create_from_edges(&edges, 1, Direction::Outgoing, &mut string_table)
.expect("Failed to create cluster");
cluster.serialize()
}
fn create_mock_data(clusters: &[Vec<u8>]) -> Vec<u8> {
let mut data = Vec::new();
data.extend_from_slice(&[0u8; 1024]);
for cluster_data in clusters {
data.extend_from_slice(cluster_data);
}
data
}
fn read_with_mock(data: &[u8], cluster_offsets: &[(u64, u32)]) -> NativeResult<Vec<u8>> {
let mut mock_data = data.to_vec();
if cluster_offsets.is_empty() {
return Err(NativeBackendError::InvalidParameter {
context: "cluster_offsets is empty".to_string(),
source: None,
});
}
let total_size: u64 = cluster_offsets.iter().map(|(_, size)| *size as u64).sum();
if total_size > MAX_CLUSTER_BUFFER_SIZE as u64 {
return Err(NativeBackendError::RecordTooLarge {
size: total_size as u32,
max_size: MAX_CLUSTER_BUFFER_SIZE as u32,
});
}
let start_offset = cluster_offsets[0].0 as usize;
let mut buffer = vec![0u8; total_size as usize];
if start_offset + total_size as usize > mock_data.len() {
return Err(NativeBackendError::Io(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"Read past end of mock data",
)));
}
buffer.copy_from_slice(&mock_data[start_offset..start_offset + total_size as usize]);
Ok(buffer)
}
#[test]
fn test_read_chain_clusters_empty_offsets_returns_error() {
let data = create_mock_data(&[]);
let cluster_offsets: &[(u64, u32)] = &[];
let result = read_with_mock(&data, cluster_offsets);
assert!(result.is_err());
match result.unwrap_err() {
NativeBackendError::InvalidParameter { context, .. } => {
assert_eq!(context, "cluster_offsets is empty");
}
_ => panic!("Expected InvalidParameter error"),
}
}
#[test]
fn test_read_chain_clusters_single_cluster() {
let cluster_data = create_test_cluster(&[2, 3, 4]);
let data = create_mock_data(&[cluster_data.clone()]);
let cluster_offsets = [(1024, cluster_data.len() as u32)];
let result = read_with_mock(&data, &cluster_offsets);
assert!(result.is_ok());
let buffer = result.unwrap();
assert_eq!(buffer.len(), cluster_data.len());
assert_eq!(buffer, cluster_data);
}
#[test]
fn test_read_chain_clusters_multiple_contiguous() {
let cluster1 = create_test_cluster(&[2, 3]);
let cluster2 = create_test_cluster(&[4, 5, 6]);
let cluster3 = create_test_cluster(&[7, 8, 9, 10]);
let data = create_mock_data(&[cluster1.clone(), cluster2.clone(), cluster3.clone()]);
let cluster_offsets = [
(1024, cluster1.len() as u32),
(1024 + cluster1.len() as u64, cluster2.len() as u32),
(
1024 + cluster1.len() as u64 + cluster2.len() as u64,
cluster3.len() as u32,
),
];
let result = read_with_mock(&data, &cluster_offsets);
assert!(result.is_ok());
let buffer = result.unwrap();
let expected_size = cluster1.len() + cluster2.len() + cluster3.len();
assert_eq!(buffer.len(), expected_size);
assert_eq!(&buffer[0..cluster1.len()], &cluster1[..]);
assert_eq!(
&buffer[cluster1.len()..cluster1.len() + cluster2.len()],
&cluster2[..]
);
assert_eq!(&buffer[cluster1.len() + cluster2.len()..], &cluster3[..]);
}
#[test]
fn test_read_chain_clusters_exceeds_max_size() {
let oversized_size = MAX_CLUSTER_BUFFER_SIZE + 1;
let cluster_offsets = [(1024, oversized_size as u32)];
let data = create_mock_data(&[create_test_cluster(&[2])]);
let result = read_with_mock(&data, &cluster_offsets);
assert!(result.is_err());
match result.unwrap_err() {
NativeBackendError::RecordTooLarge { size, max_size } => {
assert_eq!(size, oversized_size as u32);
assert_eq!(max_size, MAX_CLUSTER_BUFFER_SIZE as u32);
}
_ => panic!("Expected RecordTooLarge error"),
}
}
#[test]
fn test_extract_neighbors_first_cluster() {
let cluster1 = create_test_cluster(&[2, 3]);
let cluster2 = create_test_cluster(&[4, 5, 6]);
let cluster3 = create_test_cluster(&[7, 8, 9, 10]);
let data = create_mock_data(&[cluster1.clone(), cluster2.clone(), cluster3.clone()]);
let cluster_offsets = [
(1024, cluster1.len() as u32),
(1024 + cluster1.len() as u64, cluster2.len() as u32),
(
1024 + cluster1.len() as u64 + cluster2.len() as u64,
cluster3.len() as u32,
),
];
let buffer = read_with_mock(&data, &cluster_offsets).expect("Failed to read clusters");
let mut reader = SequentialClusterReader::new();
let neighbors = reader
.extract_neighbors(&buffer, 0, &cluster_offsets)
.expect("Failed to extract neighbors");
assert_eq!(neighbors, vec![2, 3]);
}
#[test]
fn test_extract_neighbors_middle_cluster() {
let cluster1 = create_test_cluster(&[2, 3]);
let cluster2 = create_test_cluster(&[4, 5, 6]);
let cluster3 = create_test_cluster(&[7, 8, 9, 10]);
let data = create_mock_data(&[cluster1.clone(), cluster2.clone(), cluster3.clone()]);
let cluster_offsets = [
(1024, cluster1.len() as u32),
(1024 + cluster1.len() as u64, cluster2.len() as u32),
(
1024 + cluster1.len() as u64 + cluster2.len() as u64,
cluster3.len() as u32,
),
];
let buffer = read_with_mock(&data, &cluster_offsets).expect("Failed to read clusters");
let mut reader = SequentialClusterReader::new();
let neighbors = reader
.extract_neighbors(&buffer, 1, &cluster_offsets)
.expect("Failed to extract neighbors");
assert_eq!(neighbors, vec![4, 5, 6]);
}
#[test]
fn test_extract_neighbors_invalid_index() {
let cluster1 = create_test_cluster(&[2, 3]);
let cluster2 = create_test_cluster(&[4, 5, 6]);
let data = create_mock_data(&[cluster1.clone(), cluster2.clone()]);
let cluster_offsets = [
(1024, cluster1.len() as u32),
(1024 + cluster1.len() as u64, cluster2.len() as u32),
];
let buffer = read_with_mock(&data, &cluster_offsets).expect("Failed to read clusters");
let mut reader = SequentialClusterReader::new();
let result = reader.extract_neighbors(&buffer, 5, &cluster_offsets);
assert!(result.is_err());
match result.unwrap_err() {
NativeBackendError::InvalidParameter { context, .. } => {
assert!(context.contains("cluster_index 5 out of bounds"));
}
_ => panic!("Expected InvalidParameter error"),
}
}
#[test]
fn test_extract_neighbors_last_cluster() {
let cluster1 = create_test_cluster(&[2, 3]);
let cluster2 = create_test_cluster(&[4, 5, 6]);
let cluster3 = create_test_cluster(&[7, 8, 9, 10]);
let data = create_mock_data(&[cluster1.clone(), cluster2.clone(), cluster3.clone()]);
let cluster_offsets = [
(1024, cluster1.len() as u32),
(1024 + cluster1.len() as u64, cluster2.len() as u32),
(
1024 + cluster1.len() as u64 + cluster2.len() as u64,
cluster3.len() as u32,
),
];
let buffer = read_with_mock(&data, &cluster_offsets).expect("Failed to read clusters");
let mut reader = SequentialClusterReader::new();
let neighbors = reader
.extract_neighbors(&buffer, 2, &cluster_offsets)
.expect("Failed to extract neighbors");
assert_eq!(neighbors, vec![7, 8, 9, 10]);
}
#[test]
fn test_extract_neighbors_all_clusters() {
let cluster1 = create_test_cluster(&[2, 3]);
let cluster2 = create_test_cluster(&[4, 5, 6]);
let cluster3 = create_test_cluster(&[7, 8, 9, 10]);
let data = create_mock_data(&[cluster1.clone(), cluster2.clone(), cluster3.clone()]);
let cluster_offsets = [
(1024, cluster1.len() as u32),
(1024 + cluster1.len() as u64, cluster2.len() as u32),
(
1024 + cluster1.len() as u64 + cluster2.len() as u64,
cluster3.len() as u32,
),
];
let buffer = read_with_mock(&data, &cluster_offsets).expect("Failed to read clusters");
let mut reader = SequentialClusterReader::new();
let neighbors_0 = reader
.extract_neighbors(&buffer, 0, &cluster_offsets)
.expect("Failed to extract neighbors from cluster 0");
let neighbors_1 = reader
.extract_neighbors(&buffer, 1, &cluster_offsets)
.expect("Failed to extract neighbors from cluster 1");
let neighbors_2 = reader
.extract_neighbors(&buffer, 2, &cluster_offsets)
.expect("Failed to extract neighbors from cluster 2");
assert_eq!(neighbors_0, vec![2, 3]);
assert_eq!(neighbors_1, vec![4, 5, 6]);
assert_eq!(neighbors_2, vec![7, 8, 9, 10]);
}
#[test]
fn test_max_cluster_buffer_size_constant() {
assert_eq!(MAX_CLUSTER_BUFFER_SIZE, 512 * 1024);
}
#[test]
fn test_read_chain_clusters_variable_cluster_sizes() {
let cluster1 = create_test_cluster(&[2]);
let cluster2 = create_test_cluster(&[3, 4, 5]);
let cluster3 = create_test_cluster(&[6]);
let data = create_mock_data(&[cluster1.clone(), cluster2.clone(), cluster3.clone()]);
let cluster_offsets = [
(1024, cluster1.len() as u32),
(1024 + cluster1.len() as u64, cluster2.len() as u32),
(
1024 + cluster1.len() as u64 + cluster2.len() as u64,
cluster3.len() as u32,
),
];
let result = read_with_mock(&data, &cluster_offsets);
assert!(result.is_ok());
let buffer = result.unwrap();
let expected_size = cluster1.len() + cluster2.len() + cluster3.len();
assert_eq!(buffer.len(), expected_size);
let mut reader = SequentialClusterReader::new();
let neighbors_0 = reader
.extract_neighbors(&buffer, 0, &cluster_offsets)
.expect("Failed to extract from cluster 0");
let neighbors_1 = reader
.extract_neighbors(&buffer, 1, &cluster_offsets)
.expect("Failed to extract from cluster 1");
let neighbors_2 = reader
.extract_neighbors(&buffer, 2, &cluster_offsets)
.expect("Failed to extract from cluster 2");
assert_eq!(neighbors_0, vec![2]);
assert_eq!(neighbors_1, vec![3, 4, 5]);
assert_eq!(neighbors_2, vec![6]);
}
}