use crate::degree_cache::{DegreeDelta, DegreeSidecar, DEGREE_DELTA_FILENAME};
use crate::dense_hnsw::{
dense_score_from_bytes, load_dense_hnsw_query_points, search_dense_hnsw_scoped_with_points,
search_dense_hnsw_with_points, validate_dense_hnsw_files, DenseHnswHeader, DenseQueryPoint,
DENSE_HNSW_GRAPH_FILENAME, DENSE_HNSW_META_FILENAME,
};
use crate::error::EngineError;
use crate::segment_writer::{
NODE_DENSE_VECTOR_BLOB_FILENAME, NODE_SPARSE_VECTOR_BLOB_FILENAME, NODE_VECTOR_META_ENTRY_SIZE,
NODE_VECTOR_META_FILENAME, SEGMENT_FORMAT_VERSION, SEGMENT_MAGIC,
};
use crate::sparse_postings::{
read_sparse_posting_groups, validate_sparse_posting_files, SPARSE_POSTINGS_FILENAME,
SPARSE_POSTING_INDEX_FILENAME,
};
use crate::types::*;
use memmap2::Mmap;
use serde::de::{DeserializeSeed, IgnoredAny, MapAccess, Visitor};
use std::collections::{BTreeMap, HashMap};
use std::fmt;
use std::fs::File;
use std::ops::ControlFlow;
use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::sync::{Mutex, OnceLock};
enum MappedData {
Mmap(Mmap),
Empty,
}
struct SecondaryEqSidecarCacheEntry {
data: MappedData,
validated: bool,
}
struct SecondaryRangeSidecarCacheEntry {
data: MappedData,
validated: bool,
}
impl Deref for MappedData {
type Target = [u8];
fn deref(&self) -> &[u8] {
match self {
MappedData::Mmap(m) => m,
MappedData::Empty => &[],
}
}
}
fn read_u16_at(data: &[u8], offset: usize) -> Result<u16, EngineError> {
let end = offset
.checked_add(2)
.ok_or_else(|| EngineError::CorruptRecord("u16 offset overflow".into()))?;
let slice = data.get(offset..end).ok_or_else(|| {
EngineError::CorruptRecord(format!(
"u16 read at offset {} exceeds data length {}",
offset,
data.len()
))
})?;
Ok(u16::from_le_bytes(slice.try_into().unwrap()))
}
fn read_u8_at(data: &[u8], offset: usize) -> Result<u8, EngineError> {
data.get(offset).copied().ok_or_else(|| {
EngineError::CorruptRecord(format!(
"u8 read at offset {} exceeds data length {}",
offset,
data.len()
))
})
}
fn read_u32_at(data: &[u8], offset: usize) -> Result<u32, EngineError> {
let end = offset
.checked_add(4)
.ok_or_else(|| EngineError::CorruptRecord("u32 offset overflow".into()))?;
let slice = data.get(offset..end).ok_or_else(|| {
EngineError::CorruptRecord(format!(
"u32 read at offset {} exceeds data length {}",
offset,
data.len()
))
})?;
Ok(u32::from_le_bytes(slice.try_into().unwrap()))
}
fn read_u64_at(data: &[u8], offset: usize) -> Result<u64, EngineError> {
let end = offset
.checked_add(8)
.ok_or_else(|| EngineError::CorruptRecord("u64 offset overflow".into()))?;
let slice = data.get(offset..end).ok_or_else(|| {
EngineError::CorruptRecord(format!(
"u64 read at offset {} exceeds data length {}",
offset,
data.len()
))
})?;
Ok(u64::from_le_bytes(slice.try_into().unwrap()))
}
fn collect_node_ids(nodes_data: &[u8]) -> Result<Vec<u64>, EngineError> {
if nodes_data.len() < 8 {
return Ok(Vec::new());
}
let count = read_u64_at(nodes_data, 0)? as usize;
let mut ids = Vec::with_capacity(count);
let idx_start = 8;
for index in 0..count {
let entry_off = idx_start + index * NODE_INDEX_ENTRY_SIZE;
ids.push(read_u64_at(nodes_data, entry_off)?);
}
Ok(ids)
}
fn read_i64_at(data: &[u8], offset: usize) -> Result<i64, EngineError> {
let end = offset
.checked_add(8)
.ok_or_else(|| EngineError::CorruptRecord("i64 offset overflow".into()))?;
let slice = data.get(offset..end).ok_or_else(|| {
EngineError::CorruptRecord(format!(
"i64 read at offset {} exceeds data length {}",
offset,
data.len()
))
})?;
Ok(i64::from_le_bytes(slice.try_into().unwrap()))
}
fn read_f32_at(data: &[u8], offset: usize) -> Result<f32, EngineError> {
let end = offset
.checked_add(4)
.ok_or_else(|| EngineError::CorruptRecord("f32 offset overflow".into()))?;
let slice = data.get(offset..end).ok_or_else(|| {
EngineError::CorruptRecord(format!(
"f32 read at offset {} exceeds data length {}",
offset,
data.len()
))
})?;
Ok(f32::from_le_bytes(slice.try_into().unwrap()))
}
fn read_varint_at(data: &[u8], offset: usize) -> Result<(u64, usize), EngineError> {
let mut result: u64 = 0;
let mut shift: u32 = 0;
let mut pos = offset;
loop {
if pos >= data.len() {
return Err(EngineError::CorruptRecord(format!(
"varint read at offset {} exceeds data length {}",
offset,
data.len()
)));
}
let byte = data[pos];
pos += 1;
result |= ((byte & 0x7F) as u64) << shift;
if byte & 0x80 == 0 {
return Ok((result, pos - offset));
}
shift += 7;
if shift >= 70 {
return Err(EngineError::CorruptRecord("varint too long".into()));
}
}
}
fn read_bytes_at(data: &[u8], offset: usize, len: usize) -> Result<&[u8], EngineError> {
let end = offset
.checked_add(len)
.ok_or_else(|| EngineError::CorruptRecord("byte slice offset overflow".into()))?;
data.get(offset..end).ok_or_else(|| {
EngineError::CorruptRecord(format!(
"byte slice [{}, {}) exceeds data length {}",
offset,
end,
data.len()
))
})
}
const BATCH_RANDOM_ACCESS_PENALTY: usize = 4;
const NODE_INDEX_ENTRY_SIZE: usize = 16; const EDGE_INDEX_ENTRY_SIZE: usize = 16; const ADJ_INDEX_ENTRY_SIZE: usize = 24; const TOMBSTONE_ENTRY_SIZE: usize = 25; const TYPE_INDEX_ENTRY_SIZE: usize = 16; const PROP_INDEX_ENTRY_SIZE: usize = 32; const EDGE_TRIPLE_ENTRY_SIZE: usize = 28; const SECONDARY_EQ_ENTRY_SIZE: usize = 20; const SECONDARY_RANGE_ENTRY_SIZE: usize = 16;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum BatchReadStrategy {
SeekPerKey,
MergeWalk,
}
#[inline]
fn ceil_log2_usize(n: usize) -> usize {
if n <= 1 {
0
} else {
(usize::BITS - (n - 1).leading_zeros()) as usize
}
}
fn lower_bound_u64_index(
data: &[u8],
idx_start: usize,
count: usize,
entry_size: usize,
key_offset: usize,
target: u64,
) -> Result<usize, EngineError> {
let mut lo = 0usize;
let mut hi = count;
while lo < hi {
let mid = lo + (hi - lo) / 2;
let off = idx_start + mid * entry_size + key_offset;
let key = read_u64_at(data, off)?;
if key < target {
lo = mid + 1;
} else {
hi = mid;
}
}
Ok(lo)
}
fn upper_bound_u64_index(
data: &[u8],
idx_start: usize,
count: usize,
entry_size: usize,
key_offset: usize,
target: u64,
) -> Result<usize, EngineError> {
let mut lo = 0usize;
let mut hi = count;
while lo < hi {
let mid = lo + (hi - lo) / 2;
let off = idx_start + mid * entry_size + key_offset;
let key = read_u64_at(data, off)?;
if key <= target {
lo = mid + 1;
} else {
hi = mid;
}
}
Ok(lo)
}
fn choose_batch_read_strategy(
index_data: &[u8],
index_count: usize,
entry_size: usize,
key_offset: usize,
unique_keys: usize,
min_key: u64,
max_key: u64,
) -> Result<BatchReadStrategy, EngineError> {
if unique_keys <= 2 || index_count <= 1 {
return Ok(BatchReadStrategy::SeekPerKey);
}
let idx_start = 8;
let span_start = lower_bound_u64_index(
index_data,
idx_start,
index_count,
entry_size,
key_offset,
min_key,
)?;
let span_end = upper_bound_u64_index(
index_data,
idx_start,
index_count,
entry_size,
key_offset,
max_key,
)?;
let span = span_end.saturating_sub(span_start).max(unique_keys);
let seek_cost = unique_keys
.saturating_mul(ceil_log2_usize(index_count))
.saturating_mul(BATCH_RANDOM_ACCESS_PENALTY);
if seek_cost <= span {
Ok(BatchReadStrategy::SeekPerKey)
} else {
Ok(BatchReadStrategy::MergeWalk)
}
}
fn lower_bound_key_index(
data: &[u8],
offset_table_start: usize,
count: usize,
target_type: u32,
target_key: &str,
) -> Result<usize, EngineError> {
let mut lo = 0usize;
let mut hi = count;
while lo < hi {
let mid = lo + (hi - lo) / 2;
let entry_offset = read_u64_at(data, offset_table_start + mid * 8)? as usize;
let entry_type = read_u32_at(data, entry_offset)?;
let key_len = read_u16_at(data, entry_offset + 12)? as usize;
let key_bytes = read_bytes_at(data, entry_offset + 14, key_len)?;
let entry_key = std::str::from_utf8(key_bytes).map_err(|_| {
EngineError::CorruptRecord(format!(
"invalid UTF-8 in key index at offset {}",
entry_offset + 14
))
})?;
if (entry_type, entry_key) < (target_type, target_key) {
lo = mid + 1;
} else {
hi = mid;
}
}
Ok(lo)
}
fn upper_bound_key_index(
data: &[u8],
offset_table_start: usize,
count: usize,
target_type: u32,
target_key: &str,
) -> Result<usize, EngineError> {
let mut lo = 0usize;
let mut hi = count;
while lo < hi {
let mid = lo + (hi - lo) / 2;
let entry_offset = read_u64_at(data, offset_table_start + mid * 8)? as usize;
let entry_type = read_u32_at(data, entry_offset)?;
let key_len = read_u16_at(data, entry_offset + 12)? as usize;
let key_bytes = read_bytes_at(data, entry_offset + 14, key_len)?;
let entry_key = std::str::from_utf8(key_bytes).map_err(|_| {
EngineError::CorruptRecord(format!(
"invalid UTF-8 in key index at offset {}",
entry_offset + 14
))
})?;
if (entry_type, entry_key) <= (target_type, target_key) {
lo = mid + 1;
} else {
hi = mid;
}
}
Ok(lo)
}
const NODE_META_ENTRY_SIZE: usize = 60;
const EDGE_META_ENTRY_SIZE: usize = 80;
const NODE_VECTOR_FLAG_DENSE: u8 = 0b0000_0001;
const NODE_VECTOR_FLAG_SPARSE: u8 = 0b0000_0010;
const DENSE_VECTOR_VALUE_SIZE: usize = 4;
const SPARSE_VECTOR_ENTRY_SIZE: usize = 8;
#[derive(Clone, Copy)]
struct DenseScoringMeta {
type_id: u32,
updated_at: i64,
weight: f32,
dense_offset: usize,
dense_len: usize,
}
#[derive(Clone, Copy)]
struct SparseScoringMeta {
type_id: u32,
updated_at: i64,
weight: f32,
sparse_offset: usize,
sparse_len: usize,
}
pub struct SegmentReader {
pub segment_id: u64,
#[allow(dead_code)] format_version: u32,
seg_dir: PathBuf,
nodes_mmap: MappedData,
edges_mmap: MappedData,
adj_out_idx: MappedData,
adj_out_dat: MappedData,
adj_in_idx: MappedData,
adj_in_dat: MappedData,
key_index_mmap: MappedData,
node_type_index_mmap: MappedData,
edge_type_index_mmap: MappedData,
prop_node_index_mmap: MappedData,
edge_triple_index_mmap: MappedData,
node_meta_mmap: MappedData,
edge_meta_mmap: MappedData,
node_prop_hashes_mmap: MappedData,
node_vector_meta_mmap: MappedData,
node_dense_vectors_mmap: MappedData,
node_sparse_vectors_mmap: MappedData,
dense_hnsw_meta_mmap: MappedData,
dense_hnsw_graph_mmap: MappedData,
dense_hnsw_header: Option<DenseHnswHeader>,
dense_hnsw_points: Vec<DenseQueryPoint>,
sparse_posting_index_mmap: MappedData,
sparse_postings_mmap: MappedData,
degree_delta: Option<DegreeSidecar>,
timestamp_index_mmap: MappedData,
deleted_nodes: NodeIdMap<TombstoneEntry>,
deleted_edges: NodeIdMap<TombstoneEntry>,
secondary_eq_sidecars: Mutex<HashMap<u64, SecondaryEqSidecarCacheEntry>>,
secondary_range_sidecars: Mutex<HashMap<u64, SecondaryRangeSidecarCacheEntry>>,
node_ids: OnceLock<Box<[u64]>>,
node_count: u64,
edge_count: u64,
}
impl SegmentReader {
pub fn open(
seg_dir: &Path,
segment_id: u64,
dense_config: Option<&DenseVectorConfig>,
) -> Result<Self, EngineError> {
let format_version = read_format_version(seg_dir)?;
let vector_meta_path = seg_dir.join(NODE_VECTOR_META_FILENAME);
let dense_blob_path = seg_dir.join(NODE_DENSE_VECTOR_BLOB_FILENAME);
let sparse_blob_path = seg_dir.join(NODE_SPARSE_VECTOR_BLOB_FILENAME);
let dense_hnsw_meta_path = seg_dir.join(DENSE_HNSW_META_FILENAME);
let dense_hnsw_graph_path = seg_dir.join(DENSE_HNSW_GRAPH_FILENAME);
let sparse_posting_index_path = seg_dir.join(SPARSE_POSTING_INDEX_FILENAME);
let sparse_postings_path = seg_dir.join(SPARSE_POSTINGS_FILENAME);
let degree_delta_path = seg_dir.join(DEGREE_DELTA_FILENAME);
let nodes_mmap = mmap_file(&seg_dir.join("nodes.dat"))?;
let edges_mmap = mmap_file(&seg_dir.join("edges.dat"))?;
let adj_out_idx = mmap_file(&seg_dir.join("adj_out.idx"))?;
let adj_out_dat = mmap_file(&seg_dir.join("adj_out.dat"))?;
let adj_in_idx = mmap_file(&seg_dir.join("adj_in.idx"))?;
let adj_in_dat = mmap_file(&seg_dir.join("adj_in.dat"))?;
let key_index_mmap = mmap_file(&seg_dir.join("key_index.dat"))?;
let node_type_index_mmap = mmap_file_optional(&seg_dir.join("node_type_index.dat"))?;
let edge_type_index_mmap = mmap_file_optional(&seg_dir.join("edge_type_index.dat"))?;
let prop_node_index_mmap = mmap_file_optional(&seg_dir.join("prop_index.dat"))?;
let edge_triple_index_mmap = mmap_file_optional(&seg_dir.join("edge_triple_index.dat"))?;
let node_meta_mmap = mmap_file(&seg_dir.join("node_meta.dat"))?;
let edge_meta_mmap = mmap_file(&seg_dir.join("edge_meta.dat"))?;
let node_prop_hashes_mmap = mmap_file_optional(&seg_dir.join("node_prop_hashes.dat"))?;
let node_vector_meta_mmap = mmap_file_optional(&vector_meta_path)?;
let node_dense_vectors_mmap = mmap_file_optional(&dense_blob_path)?;
let node_sparse_vectors_mmap = mmap_file_optional(&sparse_blob_path)?;
let dense_hnsw_meta_mmap = mmap_file_optional(&dense_hnsw_meta_path)?;
let dense_hnsw_graph_mmap = mmap_file_optional(&dense_hnsw_graph_path)?;
let sparse_posting_index_mmap = mmap_file_optional(&sparse_posting_index_path)?;
let sparse_postings_mmap = mmap_file_optional(&sparse_postings_path)?;
let degree_delta = DegreeSidecar::open_optional(°ree_delta_path);
let timestamp_index_mmap = mmap_file(&seg_dir.join("timestamp_index.dat"))?;
let (deleted_nodes, deleted_edges) = load_tombstones(&seg_dir.join("tombstones.dat"))?;
let node_count = if nodes_mmap.len() >= 8 {
read_u64_at(&nodes_mmap, 0)?
} else {
0
};
let edge_count = if edges_mmap.len() >= 8 {
read_u64_at(&edges_mmap, 0)?
} else {
0
};
let node_meta_count = if node_meta_mmap.len() >= 8 {
read_u64_at(&node_meta_mmap, 0)?
} else {
0
};
if format_version < 6
&& (!node_vector_meta_mmap.is_empty()
|| !node_dense_vectors_mmap.is_empty()
|| !node_sparse_vectors_mmap.is_empty())
{
return Err(EngineError::CorruptRecord(format!(
"segment {} has unexpected vector sidecars for format version {}",
segment_id, format_version
)));
}
if format_version < 7
&& (!dense_hnsw_meta_mmap.is_empty() || !dense_hnsw_graph_mmap.is_empty())
{
return Err(EngineError::CorruptRecord(format!(
"segment {} has unexpected dense HNSW files for format version {}",
segment_id, format_version
)));
}
if format_version < 8
&& (!sparse_posting_index_mmap.is_empty() || !sparse_postings_mmap.is_empty())
{
return Err(EngineError::CorruptRecord(format!(
"segment {} has unexpected sparse posting files for format version {}",
segment_id, format_version
)));
}
let vector_summary = validate_node_vector_sidecars(
segment_id,
&node_vector_meta_mmap,
&node_dense_vectors_mmap,
&node_sparse_vectors_mmap,
node_meta_count,
)?;
let dense_hnsw_header = validate_dense_hnsw_files(
&dense_hnsw_meta_mmap,
&dense_hnsw_graph_mmap,
node_dense_vectors_mmap.len(),
vector_summary.dense_count,
dense_config,
)?;
let dense_hnsw_points = if let Some(header) = dense_hnsw_header {
load_dense_hnsw_query_points(&dense_hnsw_meta_mmap, header)?
} else {
Vec::new()
};
if format_version < 8 && vector_summary.sparse_count > 0 {
return Err(EngineError::CorruptRecord(format!(
"segment {} format version {} predates sparse posting support; sparse-bearing segments must be rewritten as v8",
segment_id, format_version
)));
}
validate_sparse_posting_files(
&sparse_posting_index_mmap,
&sparse_postings_mmap,
vector_summary.sparse_count,
true,
)?;
validate_sparse_posting_parity(
segment_id,
&node_meta_mmap,
&node_vector_meta_mmap,
&node_sparse_vectors_mmap,
&sparse_posting_index_mmap,
&sparse_postings_mmap,
)?;
Ok(SegmentReader {
segment_id,
format_version,
seg_dir: seg_dir.to_path_buf(),
nodes_mmap,
edges_mmap,
adj_out_idx,
adj_out_dat,
adj_in_idx,
adj_in_dat,
key_index_mmap,
node_type_index_mmap,
edge_type_index_mmap,
prop_node_index_mmap,
edge_triple_index_mmap,
node_meta_mmap,
edge_meta_mmap,
node_prop_hashes_mmap,
node_vector_meta_mmap,
node_dense_vectors_mmap,
node_sparse_vectors_mmap,
dense_hnsw_meta_mmap,
dense_hnsw_graph_mmap,
dense_hnsw_header,
dense_hnsw_points,
sparse_posting_index_mmap,
sparse_postings_mmap,
degree_delta,
timestamp_index_mmap,
deleted_nodes,
deleted_edges,
secondary_eq_sidecars: Mutex::new(HashMap::new()),
secondary_range_sidecars: Mutex::new(HashMap::new()),
node_ids: OnceLock::new(),
node_count,
edge_count,
})
}
pub fn get_node(&self, id: u64) -> Result<Option<NodeRecord>, EngineError> {
if self.deleted_nodes.contains_key(&id) {
return Ok(None);
}
let (index, offset) = match self.binary_search_node_index(id)? {
Some(entry) => entry,
None => return Ok(None),
};
let mut node = decode_node_at(&self.nodes_mmap, offset, id)?;
self.hydrate_node_vectors(index, &mut node)?;
let (_, _, _, _, _, _, _, _, _, last_write_seq) = self.node_meta_at(index)?;
node.last_write_seq = last_write_seq;
Ok(Some(node))
}
pub fn get_edge(&self, id: u64) -> Result<Option<EdgeRecord>, EngineError> {
if self.deleted_edges.contains_key(&id) {
return Ok(None);
}
let (index, offset) = match self.binary_search_edge_index(id)? {
Some(entry) => entry,
None => return Ok(None),
};
let mut edge = decode_edge_at(&self.edges_mmap, offset, id)?;
let (_, _, _, _, _, _, _, _, _, _, last_write_seq) = self.edge_meta_at(index)?;
edge.last_write_seq = last_write_seq;
Ok(Some(edge))
}
#[allow(clippy::type_complexity)]
pub(crate) fn get_edge_core(
&self,
id: u64,
) -> Result<Option<(u64, u64, i64, i64, f32, i64, i64)>, EngineError> {
if self.deleted_edges.contains_key(&id) {
return Ok(None);
}
let (_, offset) = match self.binary_search_edge_index(id)? {
Some(entry) => entry,
None => return Ok(None),
};
let data = &self.edges_mmap[..];
let from = read_u64_at(data, offset)?;
let to = read_u64_at(data, offset + 8)?;
let created_at = read_i64_at(data, offset + 20)?;
let updated_at = read_i64_at(data, offset + 28)?;
let weight = read_f32_at(data, offset + 36)?;
let valid_from = read_i64_at(data, offset + 40)?;
let valid_to = read_i64_at(data, offset + 48)?;
Ok(Some((
from, to, created_at, updated_at, weight, valid_from, valid_to,
)))
}
pub fn node_by_key(&self, type_id: u32, key: &str) -> Result<Option<NodeRecord>, EngineError> {
let node_id = match self.binary_search_key_index(type_id, key)? {
Some(id) => id,
None => return Ok(None),
};
self.get_node(node_id)
}
pub fn resolve_keys_batch(
&self,
lookups: &[(usize, u32, &str)],
results: &mut [Option<NodeRecord>],
) -> Result<Vec<usize>, EngineError> {
if lookups.is_empty() {
return Ok(Vec::new());
}
let resolved = self.resolve_keys_to_ids(lookups)?;
if resolved.is_empty() {
return Ok(Vec::new());
}
let found_indices: Vec<usize> = resolved.iter().map(|&(orig_idx, _)| orig_idx).collect();
let mut node_lookups: Vec<(usize, u64)> = resolved
.iter()
.filter(|&&(_, nid)| !self.deleted_nodes.contains_key(&nid))
.copied()
.collect();
node_lookups.sort_unstable_by_key(|&(_, nid)| nid);
self.get_nodes_batch(&node_lookups, results)?;
Ok(found_indices)
}
fn resolve_keys_to_ids(
&self,
lookups: &[(usize, u32, &str)],
) -> Result<Vec<(usize, u64)>, EngineError> {
let mut resolved = Vec::new();
let data = &self.key_index_mmap[..];
if data.len() < 8 {
return Ok(resolved);
}
let count = read_u64_at(data, 0)? as usize;
if count == 0 {
return Ok(resolved);
}
let offset_table_start = 8;
let unique_keys = {
let mut n = 0usize;
let mut prev: Option<(u32, &str)> = None;
for &(_, tid, key) in lookups {
if prev != Some((tid, key)) {
n += 1;
prev = Some((tid, key));
}
}
n
};
let strategy = if unique_keys <= 2 || count <= 1 {
BatchReadStrategy::SeekPerKey
} else {
let (min_type, min_key) = (lookups[0].1, lookups[0].2);
let (max_type, max_key) = (lookups[lookups.len() - 1].1, lookups[lookups.len() - 1].2);
let span_start =
lower_bound_key_index(data, offset_table_start, count, min_type, min_key)?;
let span_end =
upper_bound_key_index(data, offset_table_start, count, max_type, max_key)?;
let span = span_end.saturating_sub(span_start).max(unique_keys);
let seek_cost = unique_keys
.saturating_mul(ceil_log2_usize(count))
.saturating_mul(BATCH_RANDOM_ACCESS_PENALTY);
if seek_cost <= span {
BatchReadStrategy::SeekPerKey
} else {
BatchReadStrategy::MergeWalk
}
};
if strategy == BatchReadStrategy::SeekPerKey {
let mut prev_query: Option<(u32, &str)> = None;
let mut prev_node_id: Option<u64> = None;
for &(orig_idx, type_id, key) in lookups {
let node_id = if prev_query == Some((type_id, key)) {
prev_node_id
} else {
let found = self.binary_search_key_index(type_id, key)?;
prev_query = Some((type_id, key));
prev_node_id = found;
found
};
if let Some(nid) = node_id {
resolved.push((orig_idx, nid));
}
}
} else {
let mut idx_pos = 0usize;
for &(orig_idx, type_id, key) in lookups {
while idx_pos < count {
let entry_offset =
read_u64_at(data, offset_table_start + idx_pos * 8)? as usize;
let entry_type = read_u32_at(data, entry_offset)?;
let key_len = read_u16_at(data, entry_offset + 12)? as usize;
let key_bytes = read_bytes_at(data, entry_offset + 14, key_len)?;
let entry_key = std::str::from_utf8(key_bytes).map_err(|_| {
EngineError::CorruptRecord(format!(
"invalid UTF-8 in key index at offset {}",
entry_offset + 14
))
})?;
match (entry_type, entry_key).cmp(&(type_id, key)) {
std::cmp::Ordering::Less => {
idx_pos += 1;
}
std::cmp::Ordering::Equal => {
let node_id = read_u64_at(data, entry_offset + 4)?;
resolved.push((orig_idx, node_id));
break;
}
std::cmp::Ordering::Greater => {
break;
}
}
}
}
}
Ok(resolved)
}
pub fn neighbors(
&self,
node_id: u64,
direction: Direction,
type_filter: Option<&[u32]>,
limit: usize,
) -> Result<Vec<NeighborEntry>, EngineError> {
let mut results = Vec::new();
match direction {
Direction::Outgoing => {
self.collect_adj_neighbors(
&self.adj_out_idx,
&self.adj_out_dat,
node_id,
type_filter,
limit,
None,
None,
None,
&mut results,
)?;
}
Direction::Incoming => {
self.collect_adj_neighbors(
&self.adj_in_idx,
&self.adj_in_dat,
node_id,
type_filter,
limit,
None,
None,
None,
&mut results,
)?;
}
Direction::Both => {
if limit == 0 {
let mut self_loop_edge_ids = NodeIdSet::default();
self.collect_adj_neighbors(
&self.adj_out_idx,
&self.adj_out_dat,
node_id,
type_filter,
0,
Some(&mut self_loop_edge_ids),
None,
None,
&mut results,
)?;
self.collect_adj_neighbors(
&self.adj_in_idx,
&self.adj_in_dat,
node_id,
type_filter,
0,
None,
Some(&self_loop_edge_ids),
None,
&mut results,
)?;
} else {
let mut self_loop_edge_ids = NodeIdSet::default();
self.collect_adj_neighbors(
&self.adj_out_idx,
&self.adj_out_dat,
node_id,
type_filter,
limit,
Some(&mut self_loop_edge_ids),
None,
None,
&mut results,
)?;
let mut remaining = limit.saturating_sub(results.len());
if remaining == 0 {
return Ok(results);
}
self.collect_adj_neighbors(
&self.adj_in_idx,
&self.adj_in_dat,
node_id,
type_filter,
0,
None,
Some(&self_loop_edge_ids),
Some(&mut remaining),
&mut results,
)?;
}
}
}
Ok(results)
}
pub fn is_node_deleted(&self, id: u64) -> bool {
self.deleted_nodes.contains_key(&id)
}
pub fn get_nodes_batch(
&self,
lookups: &[(usize, u64)],
results: &mut [Option<NodeRecord>],
) -> Result<(), EngineError> {
if lookups.is_empty() {
return Ok(());
}
let data = &self.nodes_mmap[..];
if data.len() < 8 {
return Ok(());
}
let count = read_u64_at(data, 0)? as usize;
if count == 0 {
return Ok(());
}
let idx_start = 8;
let min_key = lookups.first().map(|&(_, id)| id).unwrap_or(0);
let max_key = lookups.last().map(|&(_, id)| id).unwrap_or(0);
let unique_keys = {
let mut n = 0usize;
let mut prev: Option<u64> = None;
for &(_, id) in lookups {
if prev != Some(id) {
n += 1;
prev = Some(id);
}
}
n
};
let strategy = choose_batch_read_strategy(
data,
count,
NODE_INDEX_ENTRY_SIZE,
0,
unique_keys,
min_key,
max_key,
)?;
if strategy == BatchReadStrategy::SeekPerKey {
let mut prev_id: Option<u64> = None;
let mut prev_offset: Option<(usize, usize)> = None;
for &(orig_idx, target_id) in lookups {
if self.deleted_nodes.contains_key(&target_id) {
continue;
}
let offset = if prev_id == Some(target_id) {
prev_offset
} else {
let found = self.binary_search_node_index(target_id)?;
prev_id = Some(target_id);
prev_offset = found;
found
};
if let Some((index, offset)) = offset {
let mut node = decode_node_at(&self.nodes_mmap, offset, target_id)?;
self.hydrate_node_vectors(index, &mut node)?;
let (_, _, _, _, _, _, _, _, _, lws) = self.node_meta_at(index)?;
node.last_write_seq = lws;
results[orig_idx] = Some(node);
}
}
} else {
let mut idx_pos = 0usize;
for &(orig_idx, target_id) in lookups {
if self.deleted_nodes.contains_key(&target_id) {
continue;
}
while idx_pos < count {
let entry_off = idx_start + idx_pos * NODE_INDEX_ENTRY_SIZE;
let id = read_u64_at(data, entry_off)?;
if id < target_id {
idx_pos += 1;
} else if id == target_id {
let offset = read_u64_at(data, entry_off + 8)? as usize;
let mut node = decode_node_at(&self.nodes_mmap, offset, id)?;
self.hydrate_node_vectors(idx_pos, &mut node)?;
let (_, _, _, _, _, _, _, _, _, lws) = self.node_meta_at(idx_pos)?;
node.last_write_seq = lws;
results[orig_idx] = Some(node);
break;
} else {
break;
}
}
}
}
Ok(())
}
pub(crate) fn get_node_meta_batch(
&self,
lookups: &[(usize, u64)],
results: &mut [Option<(u32, i64, f32)>],
) -> Result<(), EngineError> {
if lookups.is_empty() {
return Ok(());
}
let data = &self.nodes_mmap[..];
if data.len() < 8 {
return Ok(());
}
let count = read_u64_at(data, 0)? as usize;
if count == 0 {
return Ok(());
}
let idx_start = 8;
let min_key = lookups.first().map(|&(_, id)| id).unwrap_or(0);
let max_key = lookups.last().map(|&(_, id)| id).unwrap_or(0);
let unique_keys = {
let mut n = 0usize;
let mut prev: Option<u64> = None;
for &(_, id) in lookups {
if prev != Some(id) {
n += 1;
prev = Some(id);
}
}
n
};
let strategy = choose_batch_read_strategy(
data,
count,
NODE_INDEX_ENTRY_SIZE,
0,
unique_keys,
min_key,
max_key,
)?;
if strategy == BatchReadStrategy::SeekPerKey {
let mut prev_id: Option<u64> = None;
let mut prev_meta: Option<(u32, i64, f32)> = None;
for &(orig_idx, target_id) in lookups {
if self.deleted_nodes.contains_key(&target_id) {
continue;
}
let meta = if prev_id == Some(target_id) {
prev_meta
} else if let Some((index, _offset)) = self.binary_search_node_index(target_id)? {
let (
_node_id,
_data_offset,
_data_len,
type_id,
updated_at,
weight,
_key_len,
_prop_hash_offset,
_prop_hash_count,
_last_write_seq,
) = self.node_meta_at(index)?;
let found = Some((type_id, updated_at, weight));
prev_id = Some(target_id);
prev_meta = found;
found
} else {
prev_id = Some(target_id);
prev_meta = None;
None
};
results[orig_idx] = meta;
}
} else {
let mut idx_pos = 0usize;
for &(orig_idx, target_id) in lookups {
if self.deleted_nodes.contains_key(&target_id) {
continue;
}
while idx_pos < count {
let entry_off = idx_start + idx_pos * NODE_INDEX_ENTRY_SIZE;
let id = read_u64_at(data, entry_off)?;
if id < target_id {
idx_pos += 1;
} else if id == target_id {
let (
_node_id,
_data_offset,
_data_len,
type_id,
updated_at,
weight,
_key_len,
_prop_hash_offset,
_prop_hash_count,
_last_write_seq,
) = self.node_meta_at(idx_pos)?;
results[orig_idx] = Some((type_id, updated_at, weight));
break;
} else {
break;
}
}
}
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn score_dense_candidates_sorted<F>(
&self,
ids: &[u64],
query: &[f32],
metric: DenseMetric,
query_norm: Option<f32>,
mut include: F,
hits_out: &mut Vec<VectorHit>,
remaining_out: &mut Vec<u64>,
) -> Result<(), EngineError>
where
F: FnMut(u32, i64, f32) -> bool,
{
if ids.is_empty() {
return Ok(());
}
let data = &self.nodes_mmap[..];
let node_meta = &self.node_meta_mmap[..];
let vector_meta = &self.node_vector_meta_mmap[..];
if data.len() < 8 {
remaining_out.extend_from_slice(ids);
return Ok(());
}
let count = read_u64_at(data, 0)? as usize;
if count == 0 {
remaining_out.extend_from_slice(ids);
return Ok(());
}
let idx_start = 8;
let min_key = ids.first().copied().unwrap_or(0);
let max_key = ids.last().copied().unwrap_or(0);
let mut unique_keys = 0usize;
let mut prev: Option<u64> = None;
for &id in ids {
if prev != Some(id) {
unique_keys += 1;
prev = Some(id);
}
}
let strategy = choose_batch_read_strategy(
data,
count,
NODE_INDEX_ENTRY_SIZE,
0,
unique_keys,
min_key,
max_key,
)?;
if strategy == BatchReadStrategy::SeekPerKey {
let mut prev_id: Option<u64> = None;
let mut prev_found: Option<DenseScoringMeta> = None;
for &target_id in ids {
if self.deleted_nodes.contains_key(&target_id) {
continue;
}
let found = if prev_id == Some(target_id) {
prev_found
} else if let Some((index, _offset)) = self.binary_search_node_index(target_id)? {
let found = Some(read_dense_scoring_meta(node_meta, vector_meta, index)?);
prev_id = Some(target_id);
prev_found = found;
found
} else {
prev_id = Some(target_id);
prev_found = None;
None
};
let Some(found) = found else {
remaining_out.push(target_id);
continue;
};
if found.dense_len == 0 || !include(found.type_id, found.updated_at, found.weight) {
continue;
}
hits_out.push(VectorHit {
node_id: target_id,
score: dense_score_from_bytes(
metric,
query,
query_norm,
&self.node_dense_vectors_mmap,
found.dense_offset,
found.dense_len,
)?,
});
}
} else {
let mut idx_pos = 0usize;
for &target_id in ids {
if self.deleted_nodes.contains_key(&target_id) {
continue;
}
let mut found = None;
while idx_pos < count {
let entry_off = idx_start + idx_pos * NODE_INDEX_ENTRY_SIZE;
let id = read_u64_at(data, entry_off)?;
if id < target_id {
idx_pos += 1;
} else if id == target_id {
found = Some(read_dense_scoring_meta(node_meta, vector_meta, idx_pos)?);
break;
} else {
break;
}
}
let Some(found) = found else {
remaining_out.push(target_id);
continue;
};
if found.dense_len == 0 || !include(found.type_id, found.updated_at, found.weight) {
continue;
}
hits_out.push(VectorHit {
node_id: target_id,
score: dense_score_from_bytes(
metric,
query,
query_norm,
&self.node_dense_vectors_mmap,
found.dense_offset,
found.dense_len,
)?,
});
}
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn score_sparse_candidates_sorted<F>(
&self,
ids: &[u64],
query: &[(u32, f32)],
mut include: F,
hits_out: &mut Vec<(u64, f32)>,
remaining_out: &mut Vec<u64>,
) -> Result<(), EngineError>
where
F: FnMut(u32, i64, f32) -> bool,
{
if ids.is_empty() || query.is_empty() {
return Ok(());
}
let data = &self.nodes_mmap[..];
let node_meta = &self.node_meta_mmap[..];
let vector_meta = &self.node_vector_meta_mmap[..];
let sparse_blob = &self.node_sparse_vectors_mmap[..];
if data.len() < 8 {
remaining_out.extend_from_slice(ids);
return Ok(());
}
let count = read_u64_at(data, 0)? as usize;
if count == 0 {
remaining_out.extend_from_slice(ids);
return Ok(());
}
let idx_start = 8;
let min_key = ids.first().copied().unwrap_or(0);
let max_key = ids.last().copied().unwrap_or(0);
let mut unique_keys = 0usize;
let mut prev: Option<u64> = None;
for &id in ids {
if prev != Some(id) {
unique_keys += 1;
prev = Some(id);
}
}
let strategy = choose_batch_read_strategy(
data,
count,
NODE_INDEX_ENTRY_SIZE,
0,
unique_keys,
min_key,
max_key,
)?;
if strategy == BatchReadStrategy::SeekPerKey {
let mut prev_id: Option<u64> = None;
let mut prev_found: Option<SparseScoringMeta> = None;
for &target_id in ids {
if self.deleted_nodes.contains_key(&target_id) {
continue;
}
let found = if prev_id == Some(target_id) {
prev_found
} else if let Some((index, _offset)) = self.binary_search_node_index(target_id)? {
let found = Some(read_sparse_scoring_meta(node_meta, vector_meta, index)?);
prev_id = Some(target_id);
prev_found = found;
found
} else {
prev_id = Some(target_id);
prev_found = None;
None
};
let Some(found) = found else {
remaining_out.push(target_id);
continue;
};
if found.sparse_len == 0 || !include(found.type_id, found.updated_at, found.weight)
{
continue;
}
let score = sparse_dot_score_from_blob(
query,
sparse_blob,
found.sparse_offset,
found.sparse_len,
)?;
if score > 0.0 {
hits_out.push((target_id, score));
}
}
} else {
let mut idx_pos = 0usize;
for &target_id in ids {
if self.deleted_nodes.contains_key(&target_id) {
continue;
}
let mut found = None;
while idx_pos < count {
let entry_off = idx_start + idx_pos * NODE_INDEX_ENTRY_SIZE;
let id = read_u64_at(data, entry_off)?;
if id < target_id {
idx_pos += 1;
} else if id == target_id {
found = Some(read_sparse_scoring_meta(node_meta, vector_meta, idx_pos)?);
break;
} else {
break;
}
}
let Some(found) = found else {
remaining_out.push(target_id);
continue;
};
if found.sparse_len == 0 || !include(found.type_id, found.updated_at, found.weight)
{
continue;
}
let score = sparse_dot_score_from_blob(
query,
sparse_blob,
found.sparse_offset,
found.sparse_len,
)?;
if score > 0.0 {
hits_out.push((target_id, score));
}
}
}
Ok(())
}
pub fn get_edges_batch(
&self,
lookups: &[(usize, u64)],
results: &mut [Option<EdgeRecord>],
) -> Result<(), EngineError> {
if lookups.is_empty() {
return Ok(());
}
let data = &self.edges_mmap[..];
if data.len() < 8 {
return Ok(());
}
let count = read_u64_at(data, 0)? as usize;
if count == 0 {
return Ok(());
}
let idx_start = 8;
let min_key = lookups.first().map(|&(_, id)| id).unwrap_or(0);
let max_key = lookups.last().map(|&(_, id)| id).unwrap_or(0);
let unique_keys = {
let mut n = 0usize;
let mut prev: Option<u64> = None;
for &(_, id) in lookups {
if prev != Some(id) {
n += 1;
prev = Some(id);
}
}
n
};
let strategy = choose_batch_read_strategy(
data,
count,
EDGE_INDEX_ENTRY_SIZE,
0,
unique_keys,
min_key,
max_key,
)?;
if strategy == BatchReadStrategy::SeekPerKey {
let mut prev_id: Option<u64> = None;
let mut prev_entry: Option<(usize, usize)> = None;
for &(orig_idx, target_id) in lookups {
if self.deleted_edges.contains_key(&target_id) {
continue;
}
let entry = if prev_id == Some(target_id) {
prev_entry
} else {
let found = self.binary_search_edge_index(target_id)?;
prev_id = Some(target_id);
prev_entry = found;
found
};
if let Some((index, offset)) = entry {
let mut edge = decode_edge_at(&self.edges_mmap, offset, target_id)?;
let (_, _, _, _, _, _, _, _, _, _, lws) = self.edge_meta_at(index)?;
edge.last_write_seq = lws;
results[orig_idx] = Some(edge);
}
}
} else {
let mut idx_pos = 0usize;
for &(orig_idx, target_id) in lookups {
if self.deleted_edges.contains_key(&target_id) {
continue;
}
while idx_pos < count {
let entry_off = idx_start + idx_pos * EDGE_INDEX_ENTRY_SIZE;
let id = read_u64_at(data, entry_off)?;
if id < target_id {
idx_pos += 1;
} else if id == target_id {
let offset = read_u64_at(data, entry_off + 8)? as usize;
let mut edge = decode_edge_at(&self.edges_mmap, offset, id)?;
let (_, _, _, _, _, _, _, _, _, _, lws) = self.edge_meta_at(idx_pos)?;
edge.last_write_seq = lws;
results[orig_idx] = Some(edge);
break;
} else {
break;
}
}
}
}
Ok(())
}
pub fn is_edge_deleted(&self, id: u64) -> bool {
self.deleted_edges.contains_key(&id)
}
pub fn has_node(&self, id: u64) -> bool {
self.binary_search_node_index(id).ok().flatten().is_some()
}
pub fn has_edge(&self, id: u64) -> bool {
self.binary_search_edge_index(id).ok().flatten().is_some()
}
pub fn deleted_node_tombstones(&self) -> &NodeIdMap<TombstoneEntry> {
&self.deleted_nodes
}
pub fn deleted_edge_tombstones(&self) -> &NodeIdMap<TombstoneEntry> {
&self.deleted_edges
}
pub fn deleted_node_count(&self) -> usize {
self.deleted_nodes.len()
}
pub fn deleted_edge_count(&self) -> usize {
self.deleted_edges.len()
}
pub fn deleted_node_ids(&self) -> NodeIdSet {
self.deleted_nodes.keys().copied().collect()
}
pub(crate) fn deleted_node_id_iter(&self) -> impl Iterator<Item = u64> + '_ {
self.deleted_nodes.keys().copied()
}
pub fn deleted_edge_ids(&self) -> NodeIdSet {
self.deleted_edges.keys().copied().collect()
}
pub fn node_ids(&self) -> Result<&[u64], EngineError> {
if let Some(node_ids) = self.node_ids.get() {
return Ok(node_ids.as_ref());
}
let node_ids = collect_node_ids(&self.nodes_mmap)?.into_boxed_slice();
let _ = self.node_ids.set(node_ids);
Ok(self
.node_ids
.get()
.expect("node_ids must be initialized after set")
.as_ref())
}
pub fn adj_node_ids(&self) -> Result<NodeIdSet, EngineError> {
let mut ids = NodeIdSet::default();
Self::collect_adj_index_node_ids(&self.adj_out_idx, &mut ids)?;
Self::collect_adj_index_node_ids(&self.adj_in_idx, &mut ids)?;
Ok(ids)
}
fn collect_adj_index_node_ids(
idx_mmap: &MappedData,
out: &mut NodeIdSet,
) -> Result<(), EngineError> {
let idx_data = &idx_mmap[..];
if idx_data.len() < 8 {
return Ok(());
}
let count = read_u64_at(idx_data, 0)? as usize;
let idx_start = 8;
let mut prev_node: u64 = u64::MAX;
for i in 0..count {
let entry_off = idx_start + i * ADJ_INDEX_ENTRY_SIZE;
let node_id = read_u64_at(idx_data, entry_off)?;
if node_id != prev_node {
out.insert(node_id);
prev_node = node_id;
}
}
Ok(())
}
pub fn node_count(&self) -> u64 {
self.node_count
}
pub fn edge_count(&self) -> u64 {
self.edge_count
}
pub(crate) fn degree_delta_available(&self) -> bool {
self.degree_delta.is_some()
}
pub(crate) fn degree_delta(&self, node_id: u64) -> Option<DegreeDelta> {
self.degree_delta
.as_ref()
.map(|sidecar| sidecar.lookup(node_id))
}
pub(crate) fn degree_delta_sidecar(&self) -> Option<&DegreeSidecar> {
self.degree_delta.as_ref()
}
pub fn has_tombstones(&self) -> bool {
!self.deleted_nodes.is_empty() || !self.deleted_edges.is_empty()
}
pub fn node_id_range(&self) -> Option<(u64, u64)> {
let data = &self.nodes_mmap[..];
if data.len() < 8 {
return None;
}
let count = read_u64_at(data, 0).ok()? as usize;
if count == 0 {
return None;
}
let first_id = read_u64_at(data, 8).ok()?;
let last_id = read_u64_at(data, 8 + (count - 1) * NODE_INDEX_ENTRY_SIZE).ok()?;
Some((first_id, last_id))
}
pub fn edge_id_range(&self) -> Option<(u64, u64)> {
let data = &self.edges_mmap[..];
if data.len() < 8 {
return None;
}
let count = read_u64_at(data, 0).ok()? as usize;
if count == 0 {
return None;
}
let first_id = read_u64_at(data, 8).ok()?;
let last_id = read_u64_at(data, 8 + (count - 1) * EDGE_INDEX_ENTRY_SIZE).ok()?;
Some((first_id, last_id))
}
pub(crate) fn raw_nodes_mmap(&self) -> &[u8] {
&self.nodes_mmap[..]
}
pub(crate) fn raw_edges_mmap(&self) -> &[u8] {
&self.edges_mmap[..]
}
pub(crate) fn node_meta_count(&self) -> u64 {
let data = &self.node_meta_mmap[..];
if data.len() < 8 {
return 0;
}
read_u64_at(data, 0).unwrap_or(0)
}
#[allow(clippy::type_complexity)]
pub(crate) fn node_meta_at(
&self,
index: usize,
) -> Result<(u64, u64, u32, u32, i64, f32, u16, u64, u32, u64), EngineError> {
let data = &self.node_meta_mmap[..];
let off = 8 + index * NODE_META_ENTRY_SIZE;
let node_id = read_u64_at(data, off)?;
let data_offset = read_u64_at(data, off + 8)?;
let data_len = read_u32_at(data, off + 16)?;
let type_id = read_u32_at(data, off + 20)?;
let updated_at = read_i64_at(data, off + 24)?;
let weight = read_f32_at(data, off + 32)?;
let key_len = read_u16_at(data, off + 36)?;
let prop_hash_offset = read_u64_at(data, off + 38)?;
let prop_hash_count = read_u32_at(data, off + 46)?;
let last_write_seq = read_u64_at(data, off + 50)?;
Ok((
node_id,
data_offset,
data_len,
type_id,
updated_at,
weight,
key_len,
prop_hash_offset,
prop_hash_count,
last_write_seq,
))
}
pub(crate) fn node_vector_meta_at(
&self,
index: usize,
) -> Result<(u64, u32, u64, u32), EngineError> {
let data = &self.node_vector_meta_mmap[..];
if data.is_empty() {
return Ok((0, 0, 0, 0));
}
let (flags, dense_offset, dense_len, sparse_offset, sparse_len) =
read_node_vector_meta_entry(data, index)?;
Ok((
if flags & NODE_VECTOR_FLAG_DENSE != 0 {
dense_offset
} else {
0
},
if flags & NODE_VECTOR_FLAG_DENSE != 0 {
dense_len
} else {
0
},
if flags & NODE_VECTOR_FLAG_SPARSE != 0 {
sparse_offset
} else {
0
},
if flags & NODE_VECTOR_FLAG_SPARSE != 0 {
sparse_len
} else {
0
},
))
}
pub(crate) fn edge_meta_count(&self) -> u64 {
let data = &self.edge_meta_mmap[..];
if data.len() < 8 {
return 0;
}
read_u64_at(data, 0).unwrap_or(0)
}
#[allow(clippy::type_complexity)]
pub(crate) fn edge_meta_at(
&self,
index: usize,
) -> Result<(u64, u64, u32, u64, u64, u32, i64, f32, i64, i64, u64), EngineError> {
let data = &self.edge_meta_mmap[..];
let off = 8 + index * EDGE_META_ENTRY_SIZE;
let edge_id = read_u64_at(data, off)?;
let data_offset = read_u64_at(data, off + 8)?;
let data_len = read_u32_at(data, off + 16)?;
let from = read_u64_at(data, off + 20)?;
let to = read_u64_at(data, off + 28)?;
let type_id = read_u32_at(data, off + 36)?;
let updated_at = read_i64_at(data, off + 40)?;
let weight = read_f32_at(data, off + 48)?;
let valid_from = read_i64_at(data, off + 52)?;
let valid_to = read_i64_at(data, off + 60)?;
let last_write_seq = read_u64_at(data, off + 68)?;
Ok((
edge_id,
data_offset,
data_len,
from,
to,
type_id,
updated_at,
weight,
valid_from,
valid_to,
last_write_seq,
))
}
pub(crate) fn raw_node_prop_hashes_mmap(&self) -> &[u8] {
&self.node_prop_hashes_mmap[..]
}
pub(crate) fn raw_node_dense_vectors_mmap(&self) -> &[u8] {
&self.node_dense_vectors_mmap[..]
}
pub(crate) fn raw_node_sparse_vectors_mmap(&self) -> &[u8] {
&self.node_sparse_vectors_mmap[..]
}
pub(crate) fn raw_sparse_posting_index_mmap(&self) -> &[u8] {
&self.sparse_posting_index_mmap[..]
}
pub(crate) fn raw_sparse_postings_mmap(&self) -> &[u8] {
&self.sparse_postings_mmap[..]
}
pub(crate) fn dense_hnsw_header(&self) -> Option<DenseHnswHeader> {
self.dense_hnsw_header
}
pub(crate) fn search_dense_hnsw(
&self,
query: &[f32],
ef_search: usize,
limit: usize,
) -> Result<Vec<(u64, f32)>, EngineError> {
let Some(header) = self.dense_hnsw_header else {
return Ok(Vec::new());
};
search_dense_hnsw_with_points(
header,
&self.dense_hnsw_points,
&self.dense_hnsw_graph_mmap,
&self.node_dense_vectors_mmap,
query,
ef_search,
limit,
)
}
pub(crate) fn search_dense_hnsw_scoped(
&self,
query: &[f32],
ef_search: usize,
limit: usize,
scope_ids: &crate::types::NodeIdSet,
) -> Result<Vec<(u64, f32)>, EngineError> {
let Some(header) = self.dense_hnsw_header else {
return Ok(Vec::new());
};
search_dense_hnsw_scoped_with_points(
header,
&self.dense_hnsw_points,
&self.dense_hnsw_graph_mmap,
&self.node_dense_vectors_mmap,
query,
ef_search,
limit,
scope_ids,
)
}
pub(crate) fn raw_dense_hnsw_meta_mmap(&self) -> &[u8] {
&self.dense_hnsw_meta_mmap[..]
}
#[cfg(test)]
pub(crate) fn raw_dense_hnsw_graph_mmap(&self) -> &[u8] {
&self.dense_hnsw_graph_mmap[..]
}
fn hydrate_node_vectors(&self, index: usize, node: &mut NodeRecord) -> Result<(), EngineError> {
let (dense_offset, dense_len, sparse_offset, sparse_len) =
self.node_vector_meta_at(index)?;
if dense_len > 0 {
let mut values = Vec::with_capacity(dense_len as usize);
let base = dense_offset as usize;
for i in 0..dense_len as usize {
values.push(read_f32_at(
&self.node_dense_vectors_mmap,
base + i * DENSE_VECTOR_VALUE_SIZE,
)?);
}
node.dense_vector = Some(values);
}
if sparse_len > 0 {
let mut values = Vec::with_capacity(sparse_len as usize);
let base = sparse_offset as usize;
for i in 0..sparse_len as usize {
let entry_off = base + i * SPARSE_VECTOR_ENTRY_SIZE;
let dimension_id = read_u32_at(&self.node_sparse_vectors_mmap, entry_off)?;
let weight = read_f32_at(&self.node_sparse_vectors_mmap, entry_off + 4)?;
values.push((dimension_id, weight));
}
node.sparse_vector = Some(values);
}
Ok(())
}
pub fn all_nodes(&self) -> Result<Vec<NodeRecord>, EngineError> {
let data = &self.nodes_mmap[..];
if data.len() < 8 {
return Ok(Vec::new());
}
let count = read_u64_at(data, 0)? as usize;
let idx_start = 8;
let mut nodes = Vec::with_capacity(count);
for i in 0..count {
let entry_off = idx_start + i * NODE_INDEX_ENTRY_SIZE;
let id = read_u64_at(data, entry_off)?;
let offset = read_u64_at(data, entry_off + 8)? as usize;
let mut node = decode_node_at(data, offset, id)?;
self.hydrate_node_vectors(i, &mut node)?;
nodes.push(node);
}
Ok(nodes)
}
pub fn all_edges(&self) -> Result<Vec<EdgeRecord>, EngineError> {
let data = &self.edges_mmap[..];
if data.len() < 8 {
return Ok(Vec::new());
}
let count = read_u64_at(data, 0)? as usize;
let idx_start = 8;
let mut edges = Vec::with_capacity(count);
for i in 0..count {
let entry_off = idx_start + i * EDGE_INDEX_ENTRY_SIZE;
let id = read_u64_at(data, entry_off)?;
let offset = read_u64_at(data, entry_off + 8)? as usize;
edges.push(decode_edge_at(data, offset, id)?);
}
Ok(edges)
}
pub fn nodes_by_type(&self, type_id: u32) -> Result<Vec<u64>, EngineError> {
self.query_type_index(&self.node_type_index_mmap, type_id, &self.deleted_nodes)
}
pub fn edges_by_type(&self, type_id: u32) -> Result<Vec<u64>, EngineError> {
self.query_type_index(&self.edge_type_index_mmap, type_id, &self.deleted_edges)
}
fn query_type_index(
&self,
mmap: &MappedData,
target_type: u32,
deleted: &NodeIdMap<TombstoneEntry>,
) -> Result<Vec<u64>, EngineError> {
let data = &mmap[..];
if data.len() < 8 {
return Ok(Vec::new());
}
let count = read_u64_at(data, 0)? as usize;
if count == 0 {
return Ok(Vec::new());
}
let idx_start = 8;
let entry_size = TYPE_INDEX_ENTRY_SIZE;
let mut lo = 0usize;
let mut hi = count;
while lo < hi {
let mid = lo + (hi - lo) / 2;
let entry_off = idx_start + mid * entry_size;
let entry_type = read_u32_at(data, entry_off)?;
if entry_type < target_type {
lo = mid + 1;
} else if entry_type > target_type {
hi = mid;
} else {
let offset = read_u64_at(data, entry_off + 4)? as usize;
let id_count = read_u32_at(data, entry_off + 12)? as usize;
let mut result = Vec::with_capacity(id_count);
for i in 0..id_count {
let id = read_u64_at(data, offset + i * 8)?;
if !deleted.contains_key(&id) {
result.push(id);
}
}
return Ok(result);
}
}
Ok(Vec::new())
}
pub fn node_type_ids(&self) -> Result<Vec<u32>, EngineError> {
Self::type_ids_from_index(&self.node_type_index_mmap)
}
pub fn edge_type_ids(&self) -> Result<Vec<u32>, EngineError> {
Self::type_ids_from_index(&self.edge_type_index_mmap)
}
fn type_ids_from_index(mmap: &MappedData) -> Result<Vec<u32>, EngineError> {
let data = &mmap[..];
if data.len() < 8 {
return Ok(Vec::new());
}
let count = read_u64_at(data, 0)? as usize;
let mut result = Vec::with_capacity(count);
let idx_start = 8;
for i in 0..count {
let entry_off = idx_start + i * TYPE_INDEX_ENTRY_SIZE;
result.push(read_u32_at(data, entry_off)?);
}
Ok(result)
}
pub fn nodes_by_time_range(
&self,
type_id: u32,
from_ms: i64,
to_ms: i64,
) -> Result<Vec<u64>, EngineError> {
let data = &self.timestamp_index_mmap[..];
if data.len() < 8 {
return Err(EngineError::CorruptRecord(
"timestamp_index.dat missing or truncated (< 8 bytes)".into(),
));
}
let count = read_u64_at(data, 0)? as usize;
if count == 0 {
return Ok(Vec::new());
}
let entry_start = 8usize;
let entry_size = 20usize;
let mut lo = 0usize;
let mut hi = count;
while lo < hi {
let mid = lo + (hi - lo) / 2;
let off = entry_start + mid * entry_size;
let e_type = read_u32_at(data, off)?;
let e_time = read_i64_at(data, off + 4)?;
if (e_type, e_time) < (type_id, from_ms) {
lo = mid + 1;
} else {
hi = mid;
}
}
let mut result = Vec::new();
let mut pos = lo;
while pos < count {
let off = entry_start + pos * entry_size;
let e_type = read_u32_at(data, off)?;
if e_type != type_id {
break;
}
let e_time = read_i64_at(data, off + 4)?;
if e_time > to_ms {
break;
}
let node_id = read_u64_at(data, off + 12)?;
if !self.deleted_nodes.contains_key(&node_id) {
result.push(node_id);
}
pos += 1;
}
result.sort_unstable();
Ok(result)
}
pub fn edge_by_triple(
&self,
from: u64,
to: u64,
type_id: u32,
) -> Result<Option<EdgeRecord>, EngineError> {
let data = &self.edge_triple_index_mmap[..];
if data.len() < 8 {
return Ok(None);
}
let count = read_u64_at(data, 0)? as usize;
if count == 0 {
return Ok(None);
}
let entries_start = 8;
let mut lo = 0usize;
let mut hi = count;
while lo < hi {
let mid = lo + (hi - lo) / 2;
let off = entries_start + mid * EDGE_TRIPLE_ENTRY_SIZE;
let e_from = read_u64_at(data, off)?;
let e_to = read_u64_at(data, off + 8)?;
let e_type = read_u32_at(data, off + 16)?;
match e_from.cmp(&from) {
std::cmp::Ordering::Less => lo = mid + 1,
std::cmp::Ordering::Greater => hi = mid,
std::cmp::Ordering::Equal => match e_to.cmp(&to) {
std::cmp::Ordering::Less => lo = mid + 1,
std::cmp::Ordering::Greater => hi = mid,
std::cmp::Ordering::Equal => match e_type.cmp(&type_id) {
std::cmp::Ordering::Less => lo = mid + 1,
std::cmp::Ordering::Greater => hi = mid,
std::cmp::Ordering::Equal => {
let edge_id = read_u64_at(data, off + 20)?;
return self.get_edge(edge_id);
}
},
},
}
}
Ok(None)
}
pub fn find_nodes_by_prop_hash(
&self,
type_id: u32,
key_hash: u64,
value_hash: u64,
) -> Result<Vec<u64>, EngineError> {
let data = &self.prop_node_index_mmap[..];
if data.len() < 8 {
return Ok(Vec::new());
}
let count = read_u64_at(data, 0)? as usize;
if count == 0 {
return Ok(Vec::new());
}
let idx_start = 8;
let mut lo = 0usize;
let mut hi = count;
while lo < hi {
let mid = lo + (hi - lo) / 2;
let entry_off = idx_start + mid * PROP_INDEX_ENTRY_SIZE;
let e_type = read_u32_at(data, entry_off)?;
let e_key_hash = read_u64_at(data, entry_off + 4)?;
let e_val_hash = read_u64_at(data, entry_off + 12)?;
match e_type.cmp(&type_id) {
std::cmp::Ordering::Less => lo = mid + 1,
std::cmp::Ordering::Greater => hi = mid,
std::cmp::Ordering::Equal => match e_key_hash.cmp(&key_hash) {
std::cmp::Ordering::Less => lo = mid + 1,
std::cmp::Ordering::Greater => hi = mid,
std::cmp::Ordering::Equal => match e_val_hash.cmp(&value_hash) {
std::cmp::Ordering::Less => lo = mid + 1,
std::cmp::Ordering::Greater => hi = mid,
std::cmp::Ordering::Equal => {
let offset = read_u64_at(data, entry_off + 20)? as usize;
let id_count = read_u32_at(data, entry_off + 28)? as usize;
let mut result = Vec::with_capacity(id_count);
for i in 0..id_count {
let id = read_u64_at(data, offset + i * 8)?;
if !self.deleted_nodes.contains_key(&id) {
result.push(id);
}
}
return Ok(result);
}
},
},
}
}
Ok(Vec::new())
}
fn secondary_eq_sidecar_path(&self, index_id: u64) -> PathBuf {
self.seg_dir
.join("secondary_indexes")
.join(format!("node_prop_eq_{}.dat", index_id))
}
fn secondary_range_sidecar_path(&self, index_id: u64) -> PathBuf {
self.seg_dir
.join("secondary_indexes")
.join(format!("node_prop_range_{}.dat", index_id))
}
fn with_secondary_eq_sidecar<T>(
&self,
index_id: u64,
callback: impl FnOnce(&[u8]) -> Result<T, EngineError>,
) -> Result<Option<T>, EngineError> {
let path = self.secondary_eq_sidecar_path(index_id);
if !path.exists() {
self.secondary_eq_sidecars.lock().unwrap().remove(&index_id);
return Ok(None);
}
let mut cache = self.secondary_eq_sidecars.lock().unwrap();
if let std::collections::hash_map::Entry::Vacant(entry) = cache.entry(index_id) {
let data = match mmap_file(&path) {
Ok(data) => data,
Err(EngineError::IoError(error))
if error.kind() == std::io::ErrorKind::NotFound =>
{
return Ok(None);
}
Err(error) => return Err(error),
};
entry.insert(SecondaryEqSidecarCacheEntry {
data,
validated: false,
});
}
let validation_error = {
let entry = cache
.get_mut(&index_id)
.expect("secondary equality sidecar cache entry must exist");
if entry.validated {
None
} else {
match validate_secondary_eq_sidecar_data(&entry.data) {
Ok(()) => {
entry.validated = true;
None
}
Err(error) => Some(error),
}
}
};
if let Some(error) = validation_error {
cache.remove(&index_id);
return Err(error);
}
let data = &cache
.get(&index_id)
.expect("secondary equality sidecar cache entry must exist")
.data[..];
Ok(Some(callback(data)?))
}
fn with_secondary_range_sidecar<T>(
&self,
index_id: u64,
callback: impl FnOnce(&[u8]) -> Result<T, EngineError>,
) -> Result<Option<T>, EngineError> {
let path = self.secondary_range_sidecar_path(index_id);
if !path.exists() {
self.secondary_range_sidecars
.lock()
.unwrap()
.remove(&index_id);
return Ok(None);
}
let mut cache = self.secondary_range_sidecars.lock().unwrap();
if let std::collections::hash_map::Entry::Vacant(entry) = cache.entry(index_id) {
let data = match mmap_file(&path) {
Ok(data) => data,
Err(EngineError::IoError(error))
if error.kind() == std::io::ErrorKind::NotFound =>
{
return Ok(None);
}
Err(error) => return Err(error),
};
entry.insert(SecondaryRangeSidecarCacheEntry {
data,
validated: false,
});
}
let validation_error = {
let entry = cache
.get_mut(&index_id)
.expect("secondary range sidecar cache entry must exist");
if entry.validated {
None
} else {
match validate_secondary_range_sidecar_data(&entry.data) {
Ok(()) => {
entry.validated = true;
None
}
Err(error) => Some(error),
}
}
};
if let Some(error) = validation_error {
cache.remove(&index_id);
return Err(error);
}
let data = &cache
.get(&index_id)
.expect("secondary range sidecar cache entry must exist")
.data[..];
Ok(Some(callback(data)?))
}
pub(crate) fn validate_secondary_eq_sidecar(&self, index_id: u64) -> Result<bool, EngineError> {
match self.with_secondary_eq_sidecar(index_id, validate_secondary_eq_sidecar_data)? {
Some(()) => Ok(true),
None => Ok(false),
}
}
#[cfg(test)]
pub(crate) fn find_nodes_by_secondary_eq_index(
&self,
index_id: u64,
value_hash: u64,
) -> Result<Vec<u64>, EngineError> {
match self.find_nodes_by_secondary_eq_index_if_present(index_id, value_hash)? {
Some(result) => Ok(result),
None => Ok(Vec::new()),
}
}
pub(crate) fn find_nodes_by_secondary_eq_index_if_present(
&self,
index_id: u64,
value_hash: u64,
) -> Result<Option<Vec<u64>>, EngineError> {
self.with_secondary_eq_sidecar(index_id, |data| {
find_nodes_in_secondary_eq_sidecar(data, &self.deleted_nodes, value_hash)
})
}
pub(crate) fn for_each_secondary_eq_group<F>(
&self,
index_id: u64,
mut callback: F,
) -> Result<bool, EngineError>
where
F: FnMut(u64, &[u64]) -> Result<(), EngineError>,
{
match self.with_secondary_eq_sidecar(index_id, |data| {
let count = read_u64_at(data, 0)? as usize;
let idx_start = 8;
for index in 0..count {
let entry_off = idx_start + index * SECONDARY_EQ_ENTRY_SIZE;
let value_hash = read_u64_at(data, entry_off)?;
let offset = read_u64_at(data, entry_off + 8)? as usize;
let id_count = read_u32_at(data, entry_off + 16)? as usize;
let mut ids = Vec::with_capacity(id_count);
for id_index in 0..id_count {
ids.push(read_u64_at(data, offset + id_index * 8)?);
}
callback(value_hash, &ids)?;
}
Ok(())
})? {
Some(()) => Ok(true),
None => Ok(false),
}
}
pub(crate) fn validate_secondary_range_sidecar(
&self,
index_id: u64,
) -> Result<bool, EngineError> {
match self.with_secondary_range_sidecar(index_id, validate_secondary_range_sidecar_data)? {
Some(()) => Ok(true),
None => Ok(false),
}
}
#[cfg(test)]
pub(crate) fn find_nodes_by_secondary_range_index_if_present(
&self,
index_id: u64,
lower: Option<(u64, bool)>,
upper: Option<(u64, bool)>,
after: Option<(u64, u64)>,
) -> Result<Option<Vec<(u64, u64)>>, EngineError> {
self.find_nodes_by_secondary_range_index_if_present_limited(
index_id, lower, upper, after, None,
)
}
pub(crate) fn find_nodes_by_secondary_range_index_if_present_limited(
&self,
index_id: u64,
lower: Option<(u64, bool)>,
upper: Option<(u64, bool)>,
after: Option<(u64, u64)>,
limit: Option<usize>,
) -> Result<Option<Vec<(u64, u64)>>, EngineError> {
self.with_secondary_range_sidecar(index_id, |data| {
find_nodes_in_secondary_range_sidecar(
data,
&self.deleted_nodes,
lower,
upper,
after,
limit,
)
})
}
pub(crate) fn for_each_secondary_range_entry<F>(
&self,
index_id: u64,
mut callback: F,
) -> Result<bool, EngineError>
where
F: FnMut(u64, u64) -> Result<(), EngineError>,
{
match self.with_secondary_range_sidecar(index_id, |data| {
let count = read_u64_at(data, 0)? as usize;
for index in 0..count {
let entry_off = 8 + index * SECONDARY_RANGE_ENTRY_SIZE;
let encoded_value = read_u64_at(data, entry_off)?;
let node_id = read_u64_at(data, entry_off + 8)?;
callback(encoded_value, node_id)?;
}
Ok(())
})? {
Some(()) => Ok(true),
None => Ok(false),
}
}
pub(crate) fn node_property_value_at_offset(
&self,
node_id: u64,
data_offset: u64,
prop_key: &str,
) -> Result<Option<PropValue>, EngineError> {
decode_node_property_at(&self.nodes_mmap, data_offset as usize, node_id, prop_key)
}
fn binary_search_node_index(
&self,
target_id: u64,
) -> Result<Option<(usize, usize)>, EngineError> {
let data = &self.nodes_mmap[..];
if data.len() < 8 {
return Ok(None);
}
let count = read_u64_at(data, 0)? as usize;
if count == 0 {
return Ok(None);
}
let idx_start = 8;
let mut lo = 0usize;
let mut hi = count;
while lo < hi {
let mid = lo + (hi - lo) / 2;
let entry_off = idx_start + mid * NODE_INDEX_ENTRY_SIZE;
let id = read_u64_at(data, entry_off)?;
if id < target_id {
lo = mid + 1;
} else if id > target_id {
hi = mid;
} else {
let offset = read_u64_at(data, entry_off + 8)? as usize;
return Ok(Some((mid, offset)));
}
}
Ok(None)
}
fn binary_search_edge_index(
&self,
target_id: u64,
) -> Result<Option<(usize, usize)>, EngineError> {
let data = &self.edges_mmap[..];
if data.len() < 8 {
return Ok(None);
}
let count = read_u64_at(data, 0)? as usize;
if count == 0 {
return Ok(None);
}
let idx_start = 8;
let mut lo = 0usize;
let mut hi = count;
while lo < hi {
let mid = lo + (hi - lo) / 2;
let entry_off = idx_start + mid * EDGE_INDEX_ENTRY_SIZE;
let id = read_u64_at(data, entry_off)?;
if id < target_id {
lo = mid + 1;
} else if id > target_id {
hi = mid;
} else {
let offset = read_u64_at(data, entry_off + 8)? as usize;
return Ok(Some((mid, offset)));
}
}
Ok(None)
}
fn binary_search_key_index(
&self,
target_type: u32,
target_key: &str,
) -> Result<Option<u64>, EngineError> {
let data = &self.key_index_mmap[..];
if data.len() < 8 {
return Ok(None);
}
let count = read_u64_at(data, 0)? as usize;
if count == 0 {
return Ok(None);
}
let offset_table_start = 8;
let mut lo = 0usize;
let mut hi = count;
while lo < hi {
let mid = lo + (hi - lo) / 2;
let entry_offset = read_u64_at(data, offset_table_start + mid * 8)? as usize;
let entry_type = read_u32_at(data, entry_offset)?;
let key_len = read_u16_at(data, entry_offset + 12)? as usize;
let key_bytes = read_bytes_at(data, entry_offset + 14, key_len)?;
let entry_key = std::str::from_utf8(key_bytes).map_err(|_| {
EngineError::CorruptRecord(format!(
"invalid UTF-8 in key index at offset {}",
entry_offset + 14
))
})?;
match entry_type.cmp(&target_type) {
std::cmp::Ordering::Less => lo = mid + 1,
std::cmp::Ordering::Greater => hi = mid,
std::cmp::Ordering::Equal => match entry_key.cmp(target_key) {
std::cmp::Ordering::Less => lo = mid + 1,
std::cmp::Ordering::Greater => hi = mid,
std::cmp::Ordering::Equal => {
let node_id = read_u64_at(data, entry_offset + 4)?;
return Ok(Some(node_id));
}
},
}
}
Ok(None)
}
fn find_first_adj_entry(
&self,
idx_data: &[u8],
target_node_id: u64,
) -> Result<Option<usize>, EngineError> {
if idx_data.len() < 8 {
return Ok(None);
}
let count = read_u64_at(idx_data, 0)? as usize;
if count == 0 {
return Ok(None);
}
let idx_start = 8;
let mut lo = 0usize;
let mut hi = count;
let mut found = None;
while lo < hi {
let mid = lo + (hi - lo) / 2;
let entry_off = idx_start + mid * ADJ_INDEX_ENTRY_SIZE;
let node_id = read_u64_at(idx_data, entry_off)?;
if node_id < target_node_id {
lo = mid + 1;
} else if node_id > target_node_id {
hi = mid;
} else {
found = Some(mid);
hi = mid;
}
}
Ok(found)
}
#[allow(clippy::too_many_arguments)]
fn collect_adj_neighbors(
&self,
idx_mmap: &MappedData,
dat_mmap: &MappedData,
node_id: u64,
type_filter: Option<&[u32]>,
limit: usize,
mut record_self_loop_edge_ids: Option<&mut NodeIdSet>,
skip_self_loop_edge_ids: Option<&NodeIdSet>,
mut raw_budget: Option<&mut usize>,
results: &mut Vec<NeighborEntry>,
) -> Result<(), EngineError> {
let idx_data = &idx_mmap[..];
let dat_data = &dat_mmap[..];
let first = match self.find_first_adj_entry(idx_data, node_id)? {
Some(i) => i,
None => return Ok(()),
};
let count = read_u64_at(idx_data, 0)? as usize;
let idx_start = 8;
for i in first..count {
if let Some(remaining) = raw_budget.as_ref() {
if **remaining == 0 {
break;
}
} else if limit > 0 && results.len() >= limit {
break;
}
let entry_off = idx_start + i * ADJ_INDEX_ENTRY_SIZE;
let entry_node = read_u64_at(idx_data, entry_off)?;
if entry_node != node_id {
break;
}
let entry_type = read_u32_at(idx_data, entry_off + 8)?;
let posting_offset = read_u64_at(idx_data, entry_off + 12)? as usize;
let posting_count = read_u32_at(idx_data, entry_off + 20)? as usize;
if let Some(types) = type_filter {
if !types.contains(&entry_type) {
continue;
}
}
let mut cur_off = posting_offset;
let mut prev_edge_id: u64 = 0;
for _j in 0..posting_count {
if let Some(remaining) = raw_budget.as_ref() {
if **remaining == 0 {
break;
}
} else if limit > 0 && results.len() >= limit {
break;
}
let (delta, n) = read_varint_at(dat_data, cur_off)?;
cur_off += n;
let edge_id = prev_edge_id + delta;
prev_edge_id = edge_id;
let (neighbor_id, n) = read_varint_at(dat_data, cur_off)?;
cur_off += n;
let weight = read_f32_at(dat_data, cur_off)?;
cur_off += 4;
let (vf_enc, n) = read_varint_at(dat_data, cur_off)?;
cur_off += n;
let valid_from = vf_enc as i64;
let (vt_enc, n) = read_varint_at(dat_data, cur_off)?;
cur_off += n;
let valid_to = if vt_enc == 0 {
i64::MAX
} else {
(vt_enc - 1) as i64
};
if self.deleted_edges.contains_key(&edge_id) {
continue;
}
if self.deleted_nodes.contains_key(&neighbor_id) {
continue;
}
if let Some(remaining) = raw_budget.as_deref_mut() {
*remaining = remaining.saturating_sub(1);
}
if neighbor_id == node_id {
if let Some(skip) = skip_self_loop_edge_ids {
if skip.contains(&edge_id) {
continue;
}
}
if let Some(record) = record_self_loop_edge_ids.as_deref_mut() {
record.insert(edge_id);
}
}
results.push(NeighborEntry {
node_id: neighbor_id,
edge_id,
edge_type_id: entry_type,
weight,
valid_from,
valid_to,
});
}
}
Ok(())
}
pub fn for_each_adj_posting<F>(
&self,
node_id: u64,
direction: Direction,
type_filter: Option<&[u32]>,
callback: &mut F,
) -> Result<ControlFlow<()>, EngineError>
where
F: FnMut(u64, u64, f32, i64, i64) -> ControlFlow<()>,
{
match direction {
Direction::Outgoing => self.decode_adj_postings_cb(
&self.adj_out_idx,
&self.adj_out_dat,
node_id,
type_filter,
callback,
),
Direction::Incoming => self.decode_adj_postings_cb(
&self.adj_in_idx,
&self.adj_in_dat,
node_id,
type_filter,
callback,
),
Direction::Both => {
if self
.decode_adj_postings_cb(
&self.adj_out_idx,
&self.adj_out_dat,
node_id,
type_filter,
callback,
)?
.is_break()
{
return Ok(ControlFlow::Break(()));
}
self.decode_adj_postings_cb(
&self.adj_in_idx,
&self.adj_in_dat,
node_id,
type_filter,
callback,
)
}
}
}
fn decode_adj_postings_cb<F>(
&self,
idx_mmap: &MappedData,
dat_mmap: &MappedData,
node_id: u64,
type_filter: Option<&[u32]>,
callback: &mut F,
) -> Result<ControlFlow<()>, EngineError>
where
F: FnMut(u64, u64, f32, i64, i64) -> ControlFlow<()>,
{
let idx_data = &idx_mmap[..];
let dat_data = &dat_mmap[..];
let first = match self.find_first_adj_entry(idx_data, node_id)? {
Some(i) => i,
None => return Ok(ControlFlow::Continue(())),
};
let count = read_u64_at(idx_data, 0)? as usize;
let idx_start = 8;
for i in first..count {
let entry_off = idx_start + i * ADJ_INDEX_ENTRY_SIZE;
let entry_node = read_u64_at(idx_data, entry_off)?;
if entry_node != node_id {
break;
}
let entry_type = read_u32_at(idx_data, entry_off + 8)?;
let posting_offset = read_u64_at(idx_data, entry_off + 12)? as usize;
let posting_count = read_u32_at(idx_data, entry_off + 20)? as usize;
if let Some(types) = type_filter {
if !types.contains(&entry_type) {
continue;
}
}
let mut cur_off = posting_offset;
let mut prev_edge_id: u64 = 0;
for _ in 0..posting_count {
let (delta, n) = read_varint_at(dat_data, cur_off)?;
cur_off += n;
let edge_id = prev_edge_id + delta;
prev_edge_id = edge_id;
let (neighbor_id, n) = read_varint_at(dat_data, cur_off)?;
cur_off += n;
let weight = read_f32_at(dat_data, cur_off)?;
cur_off += 4;
let (valid_from_raw, n) = read_varint_at(dat_data, cur_off)?;
cur_off += n;
let (vt_enc, n) = read_varint_at(dat_data, cur_off)?;
cur_off += n;
let valid_to = if vt_enc == 0 {
i64::MAX
} else {
(vt_enc - 1) as i64
};
if self.deleted_edges.contains_key(&edge_id) {
continue;
}
if self.deleted_nodes.contains_key(&neighbor_id) {
continue;
}
if callback(
edge_id,
neighbor_id,
weight,
valid_from_raw as i64,
valid_to,
)
.is_break()
{
return Ok(ControlFlow::Break(()));
}
}
}
Ok(ControlFlow::Continue(()))
}
pub fn neighbors_batch(
&self,
node_ids: &[u64],
direction: Direction,
type_filter: Option<&[u32]>,
) -> Result<NodeIdMap<Vec<NeighborEntry>>, EngineError> {
let mut results: NodeIdMap<Vec<NeighborEntry>> =
NodeIdMap::with_capacity_and_hasher(node_ids.len(), Default::default());
match direction {
Direction::Outgoing => {
self.collect_adj_neighbors_batch(
&self.adj_out_idx,
&self.adj_out_dat,
node_ids,
type_filter,
&mut results,
)?;
}
Direction::Incoming => {
self.collect_adj_neighbors_batch(
&self.adj_in_idx,
&self.adj_in_dat,
node_ids,
type_filter,
&mut results,
)?;
}
Direction::Both => {
self.collect_adj_neighbors_batch(
&self.adj_out_idx,
&self.adj_out_dat,
node_ids,
type_filter,
&mut results,
)?;
self.collect_adj_neighbors_batch(
&self.adj_in_idx,
&self.adj_in_dat,
node_ids,
type_filter,
&mut results,
)?;
for entries in results.values_mut() {
let mut seen = NodeIdSet::default();
entries.retain(|e| seen.insert(e.edge_id));
}
}
}
Ok(results)
}
fn collect_adj_neighbors_batch(
&self,
idx_mmap: &MappedData,
dat_mmap: &MappedData,
node_ids: &[u64],
type_filter: Option<&[u32]>,
results: &mut NodeIdMap<Vec<NeighborEntry>>,
) -> Result<(), EngineError> {
let idx_data = &idx_mmap[..];
let dat_data = &dat_mmap[..];
if idx_data.len() < 8 {
return Ok(());
}
let count = read_u64_at(idx_data, 0)? as usize;
if count == 0 {
return Ok(());
}
let idx_start = 8;
let min_key = node_ids.first().copied().unwrap_or(0);
let max_key = node_ids.last().copied().unwrap_or(0);
let unique_keys = {
let mut n = 0usize;
let mut prev: Option<u64> = None;
for &id in node_ids {
if prev != Some(id) {
n += 1;
prev = Some(id);
}
}
n
};
let use_seek = choose_batch_read_strategy(
idx_data,
count,
ADJ_INDEX_ENTRY_SIZE,
0,
unique_keys,
min_key,
max_key,
)? == BatchReadStrategy::SeekPerKey;
let mut idx_pos = 0usize;
for &target_id in node_ids {
if use_seek {
idx_pos = match self.find_first_adj_entry(idx_data, target_id)? {
Some(pos) => pos,
None => continue,
};
} else {
while idx_pos < count {
let entry_off = idx_start + idx_pos * ADJ_INDEX_ENTRY_SIZE;
let entry_node = read_u64_at(idx_data, entry_off)?;
if entry_node < target_id {
idx_pos += 1;
} else {
break;
}
}
}
while idx_pos < count {
let entry_off = idx_start + idx_pos * ADJ_INDEX_ENTRY_SIZE;
let entry_node = read_u64_at(idx_data, entry_off)?;
if entry_node != target_id {
break;
}
let entry_type = read_u32_at(idx_data, entry_off + 8)?;
let posting_offset = read_u64_at(idx_data, entry_off + 12)? as usize;
let posting_count = read_u32_at(idx_data, entry_off + 20)? as usize;
idx_pos += 1;
if let Some(types) = type_filter {
if !types.contains(&entry_type) {
continue;
}
}
let entries = results.entry(target_id).or_default();
let mut cur_off = posting_offset;
let mut prev_edge_id: u64 = 0;
for _ in 0..posting_count {
let (delta, n) = read_varint_at(dat_data, cur_off)?;
cur_off += n;
let edge_id = prev_edge_id + delta;
prev_edge_id = edge_id;
let (neighbor_id, n) = read_varint_at(dat_data, cur_off)?;
cur_off += n;
let weight = read_f32_at(dat_data, cur_off)?;
cur_off += 4;
let (vf_enc, n) = read_varint_at(dat_data, cur_off)?;
cur_off += n;
let valid_from = vf_enc as i64;
let (vt_enc, n) = read_varint_at(dat_data, cur_off)?;
cur_off += n;
let valid_to = if vt_enc == 0 {
i64::MAX
} else {
(vt_enc - 1) as i64
};
if self.deleted_edges.contains_key(&edge_id) {
continue;
}
if self.deleted_nodes.contains_key(&neighbor_id) {
continue;
}
entries.push(NeighborEntry {
node_id: neighbor_id,
edge_id,
edge_type_id: entry_type,
weight,
valid_from,
valid_to,
});
}
}
}
Ok(())
}
pub fn for_each_adj_posting_batch<F>(
&self,
node_ids: &[u64],
direction: Direction,
type_filter: Option<&[u32]>,
callback: &mut F,
) -> Result<ControlFlow<()>, EngineError>
where
F: FnMut(u64, u64, u64, f32, i64, i64) -> ControlFlow<()>,
{
match direction {
Direction::Outgoing => self.decode_adj_postings_batch_cb(
&self.adj_out_idx,
&self.adj_out_dat,
node_ids,
type_filter,
callback,
),
Direction::Incoming => self.decode_adj_postings_batch_cb(
&self.adj_in_idx,
&self.adj_in_dat,
node_ids,
type_filter,
callback,
),
Direction::Both => {
if self
.decode_adj_postings_batch_cb(
&self.adj_out_idx,
&self.adj_out_dat,
node_ids,
type_filter,
callback,
)?
.is_break()
{
return Ok(ControlFlow::Break(()));
}
self.decode_adj_postings_batch_cb(
&self.adj_in_idx,
&self.adj_in_dat,
node_ids,
type_filter,
callback,
)
}
}
}
fn decode_adj_postings_batch_cb<F>(
&self,
idx_mmap: &MappedData,
dat_mmap: &MappedData,
node_ids: &[u64],
type_filter: Option<&[u32]>,
callback: &mut F,
) -> Result<ControlFlow<()>, EngineError>
where
F: FnMut(u64, u64, u64, f32, i64, i64) -> ControlFlow<()>,
{
let idx_data = &idx_mmap[..];
let dat_data = &dat_mmap[..];
if idx_data.len() < 8 {
return Ok(ControlFlow::Continue(()));
}
let count = read_u64_at(idx_data, 0)? as usize;
if count == 0 {
return Ok(ControlFlow::Continue(()));
}
let idx_start = 8;
let min_key = node_ids.first().copied().unwrap_or(0);
let max_key = node_ids.last().copied().unwrap_or(0);
let use_seek = choose_batch_read_strategy(
idx_data,
count,
ADJ_INDEX_ENTRY_SIZE,
0,
node_ids.len(),
min_key,
max_key,
)? == BatchReadStrategy::SeekPerKey;
let mut idx_pos = 0usize;
for &target_id in node_ids {
if use_seek {
idx_pos = match self.find_first_adj_entry(idx_data, target_id)? {
Some(pos) => pos,
None => continue,
};
} else {
while idx_pos < count {
let entry_off = idx_start + idx_pos * ADJ_INDEX_ENTRY_SIZE;
let entry_node = read_u64_at(idx_data, entry_off)?;
if entry_node < target_id {
idx_pos += 1;
} else {
break;
}
}
}
while idx_pos < count {
let entry_off = idx_start + idx_pos * ADJ_INDEX_ENTRY_SIZE;
let entry_node = read_u64_at(idx_data, entry_off)?;
if entry_node != target_id {
break;
}
let entry_type = read_u32_at(idx_data, entry_off + 8)?;
let posting_offset = read_u64_at(idx_data, entry_off + 12)? as usize;
let posting_count = read_u32_at(idx_data, entry_off + 20)? as usize;
idx_pos += 1;
if let Some(types) = type_filter {
if !types.contains(&entry_type) {
continue;
}
}
let mut cur_off = posting_offset;
let mut prev_edge_id: u64 = 0;
for _ in 0..posting_count {
let (delta, n) = read_varint_at(dat_data, cur_off)?;
cur_off += n;
let edge_id = prev_edge_id + delta;
prev_edge_id = edge_id;
let (neighbor_id, n) = read_varint_at(dat_data, cur_off)?;
cur_off += n;
let weight = read_f32_at(dat_data, cur_off)?;
cur_off += 4;
let (valid_from_raw, n) = read_varint_at(dat_data, cur_off)?;
cur_off += n;
let (vt_enc, n) = read_varint_at(dat_data, cur_off)?;
cur_off += n;
let valid_to = if vt_enc == 0 {
i64::MAX
} else {
(vt_enc - 1) as i64
};
if self.deleted_edges.contains_key(&edge_id) {
continue;
}
if self.deleted_nodes.contains_key(&neighbor_id) {
continue;
}
if callback(
target_id,
edge_id,
neighbor_id,
weight,
valid_from_raw as i64,
valid_to,
)
.is_break()
{
return Ok(ControlFlow::Break(()));
}
}
}
}
Ok(ControlFlow::Continue(()))
}
}
fn read_format_version(seg_dir: &Path) -> Result<u32, EngineError> {
let path = seg_dir.join("format.ver");
if !path.exists() {
return Err(EngineError::CorruptRecord(
"segment format version 0 is too old (minimum supported: 5)".into(),
));
}
let data = std::fs::read(&path)?;
if data.len() != 8 {
return Err(EngineError::CorruptRecord(format!(
"format.ver has invalid size {} (expected 8)",
data.len()
)));
}
if data[..4] != SEGMENT_MAGIC {
return Err(EngineError::CorruptRecord(format!(
"format.ver has invalid magic {:?} (expected {:?})",
&data[..4],
SEGMENT_MAGIC
)));
}
let version = u32::from_le_bytes(data[4..8].try_into().unwrap());
if version < 5 {
return Err(EngineError::CorruptRecord(format!(
"segment format version {} is too old (minimum supported: 5)",
version
)));
}
if version > SEGMENT_FORMAT_VERSION {
return Err(EngineError::CorruptRecord(format!(
"segment format version {} is newer than supported version {}",
version, SEGMENT_FORMAT_VERSION
)));
}
Ok(version)
}
fn mmap_file_optional(path: &Path) -> Result<MappedData, EngineError> {
if !path.exists() {
return Ok(MappedData::Empty);
}
mmap_file(path)
}
fn mmap_file(path: &Path) -> Result<MappedData, EngineError> {
let file = File::open(path)?;
let meta = file.metadata()?;
if meta.len() == 0 {
return Ok(MappedData::Empty);
}
let mmap = unsafe { Mmap::map(&file).map_err(EngineError::IoError)? };
Ok(MappedData::Mmap(mmap))
}
fn validate_secondary_eq_sidecar_data(data: &[u8]) -> Result<(), EngineError> {
if data.len() < 8 {
return Err(EngineError::CorruptRecord(
"secondary equality sidecar missing header".into(),
));
}
let count = read_u64_at(data, 0)? as usize;
let idx_bytes = count
.checked_mul(SECONDARY_EQ_ENTRY_SIZE)
.and_then(|bytes| bytes.checked_add(8))
.ok_or_else(|| {
EngineError::CorruptRecord("secondary equality sidecar index overflow".into())
})?;
if idx_bytes > data.len() {
return Err(EngineError::CorruptRecord(format!(
"secondary equality sidecar index length {} exceeds file length {}",
idx_bytes,
data.len()
)));
}
let mut prev_value_hash = None;
let mut prev_end = idx_bytes;
for index in 0..count {
let entry_off = 8 + index * SECONDARY_EQ_ENTRY_SIZE;
let value_hash = read_u64_at(data, entry_off)?;
let offset = read_u64_at(data, entry_off + 8)? as usize;
let id_count = read_u32_at(data, entry_off + 16)? as usize;
let end = offset
.checked_add(id_count.checked_mul(8).ok_or_else(|| {
EngineError::CorruptRecord("secondary equality sidecar group overflow".into())
})?)
.ok_or_else(|| {
EngineError::CorruptRecord("secondary equality sidecar group end overflow".into())
})?;
if offset < idx_bytes || end > data.len() {
return Err(EngineError::CorruptRecord(format!(
"secondary equality sidecar group {} range [{}, {}) exceeds file length {}",
index,
offset,
end,
data.len()
)));
}
if let Some(previous) = prev_value_hash {
if value_hash <= previous {
return Err(EngineError::CorruptRecord(format!(
"secondary equality sidecar value hashes are not strictly increasing at group {}",
index
)));
}
}
if offset < prev_end {
return Err(EngineError::CorruptRecord(format!(
"secondary equality sidecar group {} range [{}, {}) overlaps a previous group",
index, offset, end
)));
}
let mut previous_node_id = None;
for id_index in 0..id_count {
let node_id = read_u64_at(data, offset + id_index * 8)?;
if let Some(previous) = previous_node_id {
if node_id <= previous {
return Err(EngineError::CorruptRecord(format!(
"secondary equality sidecar group {} node IDs are not strictly increasing",
index
)));
}
}
previous_node_id = Some(node_id);
}
prev_value_hash = Some(value_hash);
prev_end = end;
}
Ok(())
}
fn validate_secondary_range_sidecar_data(data: &[u8]) -> Result<(), EngineError> {
if data.len() < 8 {
return Err(EngineError::CorruptRecord(
"secondary range sidecar missing header".into(),
));
}
let count = read_u64_at(data, 0)? as usize;
let entries_bytes = count
.checked_mul(SECONDARY_RANGE_ENTRY_SIZE)
.and_then(|bytes| bytes.checked_add(8))
.ok_or_else(|| {
EngineError::CorruptRecord("secondary range sidecar index overflow".into())
})?;
if entries_bytes != data.len() {
return Err(EngineError::CorruptRecord(format!(
"secondary range sidecar length {} does not match expected fixed-width length {}",
data.len(),
entries_bytes
)));
}
let mut previous = None;
for index in 0..count {
let entry_off = 8 + index * SECONDARY_RANGE_ENTRY_SIZE;
let current = (
read_u64_at(data, entry_off)?,
read_u64_at(data, entry_off + 8)?,
);
if let Some(prev) = previous {
if current <= prev {
return Err(EngineError::CorruptRecord(format!(
"secondary range sidecar entries are not strictly increasing at entry {}",
index
)));
}
}
previous = Some(current);
}
Ok(())
}
fn find_nodes_in_secondary_eq_sidecar(
data: &[u8],
deleted_nodes: &NodeIdMap<TombstoneEntry>,
value_hash: u64,
) -> Result<Vec<u64>, EngineError> {
let count = read_u64_at(data, 0)? as usize;
if count == 0 {
return Ok(Vec::new());
}
let mut lo = 0usize;
let mut hi = count;
while lo < hi {
let mid = lo + (hi - lo) / 2;
let entry_off = 8 + mid * SECONDARY_EQ_ENTRY_SIZE;
let entry_value_hash = read_u64_at(data, entry_off)?;
match entry_value_hash.cmp(&value_hash) {
std::cmp::Ordering::Less => lo = mid + 1,
std::cmp::Ordering::Greater => hi = mid,
std::cmp::Ordering::Equal => {
let offset = read_u64_at(data, entry_off + 8)? as usize;
let id_count = read_u32_at(data, entry_off + 16)? as usize;
let mut result = Vec::with_capacity(id_count);
for index in 0..id_count {
let node_id = read_u64_at(data, offset + index * 8)?;
if !deleted_nodes.contains_key(&node_id) {
result.push(node_id);
}
}
return Ok(result);
}
}
}
Ok(Vec::new())
}
fn secondary_range_sidecar_lower_bound(
data: &[u8],
target: (u64, u64),
strict: bool,
) -> Result<usize, EngineError> {
let count = read_u64_at(data, 0)? as usize;
let mut lo = 0usize;
let mut hi = count;
while lo < hi {
let mid = lo + (hi - lo) / 2;
let entry_off = 8 + mid * SECONDARY_RANGE_ENTRY_SIZE;
let current = (
read_u64_at(data, entry_off)?,
read_u64_at(data, entry_off + 8)?,
);
let ordering = current.cmp(&target);
if ordering == std::cmp::Ordering::Less || (strict && ordering == std::cmp::Ordering::Equal)
{
lo = mid + 1;
} else {
hi = mid;
}
}
Ok(lo)
}
fn find_nodes_in_secondary_range_sidecar(
data: &[u8],
deleted_nodes: &NodeIdMap<TombstoneEntry>,
lower: Option<(u64, bool)>,
upper: Option<(u64, bool)>,
after: Option<(u64, u64)>,
limit: Option<usize>,
) -> Result<Vec<(u64, u64)>, EngineError> {
let count = read_u64_at(data, 0)? as usize;
if count == 0 {
return Ok(Vec::new());
}
let mut start: Option<((u64, u64), bool)> = None;
if let Some((encoded_value, inclusive)) = lower {
let candidate = if inclusive {
((encoded_value, 0), false)
} else {
((encoded_value, u64::MAX), true)
};
start = Some(candidate);
}
if let Some(after) = after {
let candidate = (after, true);
start = Some(match start {
Some(existing) if existing.0 > candidate.0 => existing,
Some(existing) if existing.0 < candidate.0 => candidate,
Some(existing) => (existing.0, existing.1 || candidate.1),
None => candidate,
});
}
let start_index = if let Some((target, strict)) = start {
secondary_range_sidecar_lower_bound(data, target, strict)?
} else {
0
};
let mut results = Vec::new();
for index in start_index..count {
let entry_off = 8 + index * SECONDARY_RANGE_ENTRY_SIZE;
let encoded_value = read_u64_at(data, entry_off)?;
let node_id = read_u64_at(data, entry_off + 8)?;
if let Some((upper_value, inclusive)) = upper {
if encoded_value > upper_value || (!inclusive && encoded_value == upper_value) {
break;
}
}
if !deleted_nodes.contains_key(&node_id) {
results.push((encoded_value, node_id));
if limit.is_some_and(|limit| results.len() >= limit) {
break;
}
}
}
Ok(results)
}
struct PropLookupSeed<'a> {
target: &'a str,
}
impl<'de> DeserializeSeed<'de> for PropLookupSeed<'_> {
type Value = Option<PropValue>;
fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
where
D: serde::Deserializer<'de>,
{
deserializer.deserialize_map(PropLookupVisitor {
target: self.target,
})
}
}
struct PropLookupVisitor<'a> {
target: &'a str,
}
impl<'de> Visitor<'de> for PropLookupVisitor<'_> {
type Value = Option<PropValue>;
fn expecting(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
formatter.write_str("a node property map")
}
fn visit_map<M>(self, mut map: M) -> Result<Self::Value, M::Error>
where
M: MapAccess<'de>,
{
let mut found = None;
while let Some(key) = map.next_key::<String>()? {
if key == self.target {
found = Some(map.next_value()?);
} else {
let _: IgnoredAny = map.next_value()?;
}
}
Ok(found)
}
}
fn decode_node_property_at(
data: &[u8],
offset: usize,
id: u64,
prop_key: &str,
) -> Result<Option<PropValue>, EngineError> {
let _type_id = read_u32_at(data, offset)?;
let key_len = read_u16_at(data, offset + 4)? as usize;
let _key_bytes = read_bytes_at(data, offset + 6, key_len)?;
let pos = offset + 6 + key_len;
let props_len = read_u32_at(data, pos + 20)? as usize;
let props_bytes = read_bytes_at(data, pos + 24, props_len)?;
let mut deserializer = rmp_serde::Deserializer::from_read_ref(props_bytes);
PropLookupSeed { target: prop_key }
.deserialize(&mut deserializer)
.map_err(|error| {
EngineError::CorruptRecord(format!(
"node {} targeted props decode at offset {}: {}",
id,
pos + 24,
error
))
})
}
fn validate_node_vector_sidecars(
segment_id: u64,
vector_meta: &[u8],
dense_blob: &[u8],
sparse_blob: &[u8],
expected_count: u64,
) -> Result<NodeVectorSidecarSummary, EngineError> {
if vector_meta.is_empty() {
if !dense_blob.is_empty() || !sparse_blob.is_empty() {
return Err(EngineError::CorruptRecord(format!(
"segment {} has vector blobs without node vector metadata",
segment_id
)));
}
return Ok(NodeVectorSidecarSummary {
dense_count: 0,
sparse_count: 0,
});
}
if vector_meta.len() < 8 {
return Err(EngineError::CorruptRecord(format!(
"segment {} node vector metadata too short: {} bytes",
segment_id,
vector_meta.len()
)));
}
let count = read_u64_at(vector_meta, 0)?;
if count != expected_count {
return Err(EngineError::CorruptRecord(format!(
"segment {} node vector metadata count {} does not match node metadata count {}",
segment_id, count, expected_count
)));
}
let expected_len = 8usize
.checked_add(count as usize * NODE_VECTOR_META_ENTRY_SIZE)
.ok_or_else(|| EngineError::CorruptRecord("node vector metadata size overflow".into()))?;
if vector_meta.len() != expected_len {
return Err(EngineError::CorruptRecord(format!(
"segment {} node vector metadata size {} does not match expected {}",
segment_id,
vector_meta.len(),
expected_len
)));
}
let mut has_dense = false;
let mut has_sparse = false;
let mut next_dense_offset = 0usize;
let mut next_sparse_offset = 0usize;
let mut dense_count = 0usize;
let mut sparse_count = 0usize;
for index in 0..count as usize {
let (flags, dense_offset, dense_len, sparse_offset, sparse_len) =
read_node_vector_meta_entry(vector_meta, index)?;
if flags & !(NODE_VECTOR_FLAG_DENSE | NODE_VECTOR_FLAG_SPARSE) != 0 {
return Err(EngineError::CorruptRecord(format!(
"segment {} node vector entry {} has invalid flags {:#010b}",
segment_id, index, flags
)));
}
if flags & NODE_VECTOR_FLAG_DENSE == 0 {
if dense_offset != 0 || dense_len != 0 {
return Err(EngineError::CorruptRecord(format!(
"segment {} node vector entry {} has dense payload without dense flag",
segment_id, index
)));
}
} else {
has_dense = true;
dense_count += 1;
let dense_offset = dense_offset as usize;
if dense_offset != next_dense_offset {
return Err(EngineError::CorruptRecord(format!(
"segment {} node vector entry {} dense offset {} does not match expected {}",
segment_id, index, dense_offset, next_dense_offset
)));
}
validate_blob_range(
dense_blob,
dense_offset as u64,
dense_len as usize * DENSE_VECTOR_VALUE_SIZE,
"dense",
segment_id,
index,
)?;
next_dense_offset = next_dense_offset
.checked_add(dense_len as usize * DENSE_VECTOR_VALUE_SIZE)
.ok_or_else(|| EngineError::CorruptRecord("dense blob size overflow".into()))?;
}
if flags & NODE_VECTOR_FLAG_SPARSE == 0 {
if sparse_offset != 0 || sparse_len != 0 {
return Err(EngineError::CorruptRecord(format!(
"segment {} node vector entry {} has sparse payload without sparse flag",
segment_id, index
)));
}
} else {
has_sparse = true;
sparse_count += 1;
let sparse_offset = sparse_offset as usize;
if sparse_offset != next_sparse_offset {
return Err(EngineError::CorruptRecord(format!(
"segment {} node vector entry {} sparse offset {} does not match expected {}",
segment_id, index, sparse_offset, next_sparse_offset
)));
}
validate_blob_range(
sparse_blob,
sparse_offset as u64,
sparse_len as usize * SPARSE_VECTOR_ENTRY_SIZE,
"sparse",
segment_id,
index,
)?;
next_sparse_offset = next_sparse_offset
.checked_add(sparse_len as usize * SPARSE_VECTOR_ENTRY_SIZE)
.ok_or_else(|| EngineError::CorruptRecord("sparse blob size overflow".into()))?;
}
}
if has_dense && dense_blob.is_empty() {
return Err(EngineError::CorruptRecord(format!(
"segment {} references dense vectors but dense blob is missing",
segment_id
)));
}
if has_sparse && sparse_blob.is_empty() {
return Err(EngineError::CorruptRecord(format!(
"segment {} references sparse vectors but sparse blob is missing",
segment_id
)));
}
if !has_dense && !dense_blob.is_empty() {
return Err(EngineError::CorruptRecord(format!(
"segment {} has orphaned dense vector blob",
segment_id
)));
}
if !has_sparse && !sparse_blob.is_empty() {
return Err(EngineError::CorruptRecord(format!(
"segment {} has orphaned sparse vector blob",
segment_id
)));
}
if has_dense && next_dense_offset != dense_blob.len() {
return Err(EngineError::CorruptRecord(format!(
"segment {} dense vector blob has trailing or unreferenced bytes: expected {}, got {}",
segment_id,
next_dense_offset,
dense_blob.len()
)));
}
if has_sparse && next_sparse_offset != sparse_blob.len() {
return Err(EngineError::CorruptRecord(format!(
"segment {} sparse vector blob has trailing or unreferenced bytes: expected {}, got {}",
segment_id,
next_sparse_offset,
sparse_blob.len()
)));
}
Ok(NodeVectorSidecarSummary {
dense_count,
sparse_count,
})
}
struct NodeVectorSidecarSummary {
dense_count: usize,
sparse_count: usize,
}
fn validate_sparse_posting_parity(
segment_id: u64,
node_meta: &[u8],
vector_meta: &[u8],
sparse_blob: &[u8],
sparse_posting_index: &[u8],
sparse_postings: &[u8],
) -> Result<(), EngineError> {
if vector_meta.is_empty() {
if !sparse_posting_index.is_empty() || !sparse_postings.is_empty() {
return Err(EngineError::CorruptRecord(format!(
"segment {} has sparse posting files without node vector metadata",
segment_id
)));
}
return Ok(());
}
let mut expected = BTreeMap::<u32, Vec<(u64, f32)>>::new();
let count = read_u64_at(node_meta, 0)? as usize;
for index in 0..count {
let node_id = read_u64_at(node_meta, 8 + index * NODE_META_ENTRY_SIZE)?;
let (flags, _dense_offset, _dense_len, sparse_offset, sparse_len) =
read_node_vector_meta_entry(vector_meta, index)?;
if flags & NODE_VECTOR_FLAG_SPARSE == 0 {
continue;
}
let base = sparse_offset as usize;
for entry_index in 0..sparse_len as usize {
let entry_off = base + entry_index * SPARSE_VECTOR_ENTRY_SIZE;
let dimension_id = read_u32_at(sparse_blob, entry_off)?;
let weight = read_f32_at(sparse_blob, entry_off + 4)?;
if weight < 0.0 {
return Err(EngineError::CorruptRecord(format!(
"segment {} sparse vector payload for node {} dimension {} has negative weight",
segment_id, node_id, dimension_id
)));
}
expected
.entry(dimension_id)
.or_default()
.push((node_id, weight));
}
}
let actual = read_sparse_posting_groups(sparse_posting_index, sparse_postings)?;
if expected.len() != actual.len() {
return Err(EngineError::CorruptRecord(format!(
"segment {} sparse posting dimension count {} does not match sparse vector payload count {}",
segment_id,
actual.len(),
expected.len()
)));
}
for (dimension_id, expected_postings) in &expected {
let Some(actual_postings) = actual.get(dimension_id) else {
return Err(EngineError::CorruptRecord(format!(
"segment {} sparse posting files are missing dimension {} from sparse vectors",
segment_id, dimension_id
)));
};
if expected_postings.len() != actual_postings.len() {
return Err(EngineError::CorruptRecord(format!(
"segment {} sparse posting dimension {} count {} does not match sparse vector payload count {}",
segment_id,
dimension_id,
actual_postings.len(),
expected_postings.len()
)));
}
for (expected_posting, actual_posting) in
expected_postings.iter().zip(actual_postings.iter())
{
if expected_posting.0 != actual_posting.0
|| expected_posting.1.to_bits() != actual_posting.1.to_bits()
{
return Err(EngineError::CorruptRecord(format!(
"segment {} sparse posting dimension {} does not match sparse vector payloads",
segment_id, dimension_id
)));
}
}
}
for dimension_id in actual.keys() {
if !expected.contains_key(dimension_id) {
return Err(EngineError::CorruptRecord(format!(
"segment {} sparse posting dimension {} is not present in sparse vector payloads",
segment_id, dimension_id
)));
}
}
Ok(())
}
fn validate_blob_range(
blob: &[u8],
offset: u64,
len: usize,
kind: &str,
segment_id: u64,
index: usize,
) -> Result<(), EngineError> {
let base = offset as usize;
let end = base
.checked_add(len)
.ok_or_else(|| EngineError::CorruptRecord(format!("{kind} vector range overflow")))?;
if end > blob.len() {
return Err(EngineError::CorruptRecord(format!(
"segment {} node vector entry {} {} range [{}, {}) exceeds blob length {}",
segment_id,
index,
kind,
base,
end,
blob.len()
)));
}
Ok(())
}
fn read_node_vector_meta_entry(
data: &[u8],
index: usize,
) -> Result<(u8, u64, u32, u64, u32), EngineError> {
let off = 8 + index * NODE_VECTOR_META_ENTRY_SIZE;
let flags = read_u8_at(data, off)?;
let dense_offset = read_u64_at(data, off + 4)?;
let dense_len = read_u32_at(data, off + 12)?;
let sparse_offset = read_u64_at(data, off + 16)?;
let sparse_len = read_u32_at(data, off + 24)?;
Ok((flags, dense_offset, dense_len, sparse_offset, sparse_len))
}
fn read_dense_scoring_meta(
node_meta: &[u8],
vector_meta: &[u8],
index: usize,
) -> Result<DenseScoringMeta, EngineError> {
let node_off = 8 + index * NODE_META_ENTRY_SIZE;
let node_end = node_off
.checked_add(NODE_META_ENTRY_SIZE)
.ok_or_else(|| EngineError::CorruptRecord("node meta offset overflow".into()))?;
let node_entry = node_meta.get(node_off..node_end).ok_or_else(|| {
EngineError::CorruptRecord(format!(
"node meta read at index {} exceeds data length {}",
index,
node_meta.len()
))
})?;
let vector_off = 8 + index * NODE_VECTOR_META_ENTRY_SIZE;
let vector_end = vector_off
.checked_add(NODE_VECTOR_META_ENTRY_SIZE)
.ok_or_else(|| EngineError::CorruptRecord("node vector meta offset overflow".into()))?;
let vector_entry = vector_meta.get(vector_off..vector_end).ok_or_else(|| {
EngineError::CorruptRecord(format!(
"node vector meta read at index {} exceeds data length {}",
index,
vector_meta.len()
))
})?;
Ok(DenseScoringMeta {
type_id: u32::from_le_bytes(node_entry[20..24].try_into().unwrap()),
updated_at: i64::from_le_bytes(node_entry[24..32].try_into().unwrap()),
weight: f32::from_le_bytes(node_entry[32..36].try_into().unwrap()),
dense_offset: u64::from_le_bytes(vector_entry[4..12].try_into().unwrap()) as usize,
dense_len: u32::from_le_bytes(vector_entry[12..16].try_into().unwrap()) as usize,
})
}
fn read_sparse_scoring_meta(
node_meta: &[u8],
vector_meta: &[u8],
index: usize,
) -> Result<SparseScoringMeta, EngineError> {
let node_off = 8 + index * NODE_META_ENTRY_SIZE;
let node_end = node_off
.checked_add(NODE_META_ENTRY_SIZE)
.ok_or_else(|| EngineError::CorruptRecord("node meta offset overflow".into()))?;
let node_entry = node_meta.get(node_off..node_end).ok_or_else(|| {
EngineError::CorruptRecord(format!(
"node meta read at index {} exceeds data length {}",
index,
node_meta.len()
))
})?;
let vector_off = 8 + index * NODE_VECTOR_META_ENTRY_SIZE;
let vector_end = vector_off
.checked_add(NODE_VECTOR_META_ENTRY_SIZE)
.ok_or_else(|| EngineError::CorruptRecord("node vector meta offset overflow".into()))?;
let vector_entry = vector_meta.get(vector_off..vector_end).ok_or_else(|| {
EngineError::CorruptRecord(format!(
"node vector meta read at index {} exceeds data length {}",
index,
vector_meta.len()
))
})?;
Ok(SparseScoringMeta {
type_id: u32::from_le_bytes(node_entry[20..24].try_into().unwrap()),
updated_at: i64::from_le_bytes(node_entry[24..32].try_into().unwrap()),
weight: f32::from_le_bytes(node_entry[32..36].try_into().unwrap()),
sparse_offset: u64::from_le_bytes(vector_entry[16..24].try_into().unwrap()) as usize,
sparse_len: u32::from_le_bytes(vector_entry[24..28].try_into().unwrap()) as usize,
})
}
fn decode_node_at(data: &[u8], offset: usize, id: u64) -> Result<NodeRecord, EngineError> {
let type_id = read_u32_at(data, offset)?;
let key_len = read_u16_at(data, offset + 4)? as usize;
let key_bytes = read_bytes_at(data, offset + 6, key_len)?;
let key = std::str::from_utf8(key_bytes)
.map_err(|_| {
EngineError::CorruptRecord(format!(
"invalid UTF-8 in node key at offset {}",
offset + 6
))
})?
.to_string();
let pos = offset + 6 + key_len;
let created_at = read_i64_at(data, pos)?;
let updated_at = read_i64_at(data, pos + 8)?;
let weight = read_f32_at(data, pos + 16)?;
let props_len = read_u32_at(data, pos + 20)? as usize;
let props_bytes = read_bytes_at(data, pos + 24, props_len)?;
let props: BTreeMap<String, PropValue> = rmp_serde::from_slice(props_bytes).map_err(|e| {
EngineError::CorruptRecord(format!("node props decode at offset {}: {}", pos + 24, e))
})?;
Ok(NodeRecord {
id,
type_id,
key,
props,
created_at,
updated_at,
weight,
dense_vector: None,
sparse_vector: None,
last_write_seq: 0,
})
}
fn decode_edge_at(data: &[u8], offset: usize, id: u64) -> Result<EdgeRecord, EngineError> {
let from = read_u64_at(data, offset)?;
let to = read_u64_at(data, offset + 8)?;
let type_id = read_u32_at(data, offset + 16)?;
let created_at = read_i64_at(data, offset + 20)?;
let updated_at = read_i64_at(data, offset + 28)?;
let weight = read_f32_at(data, offset + 36)?;
let valid_from = read_i64_at(data, offset + 40)?;
let valid_to = read_i64_at(data, offset + 48)?;
let props_len = read_u32_at(data, offset + 56)? as usize;
let props_bytes = read_bytes_at(data, offset + 60, props_len)?;
let props: BTreeMap<String, PropValue> = rmp_serde::from_slice(props_bytes).map_err(|e| {
EngineError::CorruptRecord(format!(
"edge props decode at offset {}: {}",
offset + 60,
e
))
})?;
Ok(EdgeRecord {
id,
from,
to,
type_id,
props,
created_at,
updated_at,
weight,
valid_from,
valid_to,
last_write_seq: 0,
})
}
fn load_tombstones(
path: &Path,
) -> Result<(NodeIdMap<TombstoneEntry>, NodeIdMap<TombstoneEntry>), EngineError> {
let data = std::fs::read(path)?;
if data.len() < 8 {
return Ok((NodeIdMap::default(), NodeIdMap::default()));
}
let count = read_u64_at(&data, 0)? as usize;
let mut deleted_nodes = NodeIdMap::default();
let mut deleted_edges = NodeIdMap::default();
for i in 0..count {
let off = 8 + i * TOMBSTONE_ENTRY_SIZE;
if off + TOMBSTONE_ENTRY_SIZE > data.len() {
return Err(EngineError::CorruptRecord(format!(
"tombstone entry {} at offset {} exceeds file length {}",
i,
off,
data.len()
)));
}
let kind = data[off];
let id = read_u64_at(&data, off + 1)?;
let deleted_at = read_i64_at(&data, off + 9)?;
let last_write_seq = read_u64_at(&data, off + 17)?;
let entry = TombstoneEntry {
deleted_at,
last_write_seq,
};
match kind {
0 => {
deleted_nodes.insert(id, entry);
}
1 => {
deleted_edges.insert(id, entry);
}
_ => {} }
}
Ok((deleted_nodes, deleted_edges))
}
fn sparse_dot_score_from_blob(
query: &[(u32, f32)],
sparse_blob: &[u8],
offset: usize,
entry_count: usize,
) -> Result<f32, EngineError> {
let mut score = 0.0f32;
let mut qi = 0usize;
let mut vi = 0usize;
while qi < query.len() && vi < entry_count {
let entry_off = offset + vi * SPARSE_VECTOR_ENTRY_SIZE;
let dim_id = read_u32_at(sparse_blob, entry_off)?;
let weight = read_f32_at(sparse_blob, entry_off + 4)?;
match query[qi].0.cmp(&dim_id) {
std::cmp::Ordering::Less => qi += 1,
std::cmp::Ordering::Greater => vi += 1,
std::cmp::Ordering::Equal => {
score += query[qi].1 * weight;
qi += 1;
vi += 1;
}
}
}
Ok(score)
}
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use crate::memtable::Memtable;
use crate::segment_writer::write_segment_without_degree_sidecar_for_test as write_segment;
pub fn read_varint_at_pub(data: &[u8], offset: usize) -> (u64, usize) {
read_varint_at(data, offset).unwrap()
}
fn write_format_ver(seg_dir: &std::path::Path) {
use crate::segment_writer::{SEGMENT_FORMAT_VERSION, SEGMENT_MAGIC};
let mut data = Vec::new();
data.extend_from_slice(&SEGMENT_MAGIC);
data.extend_from_slice(&SEGMENT_FORMAT_VERSION.to_le_bytes());
std::fs::write(seg_dir.join("format.ver"), &data).unwrap();
}
fn make_node(id: u64, type_id: u32, key: &str) -> NodeRecord {
NodeRecord {
id,
type_id,
key: key.to_string(),
props: BTreeMap::new(),
created_at: 1000,
updated_at: 1001,
weight: 0.5,
dense_vector: None,
sparse_vector: None,
last_write_seq: 0,
}
}
fn make_node_with_props(id: u64, type_id: u32, key: &str) -> NodeRecord {
let mut props = BTreeMap::new();
props.insert("name".to_string(), PropValue::String(key.to_string()));
props.insert("score".to_string(), PropValue::Float(0.95));
NodeRecord {
id,
type_id,
key: key.to_string(),
props,
created_at: 1000,
updated_at: 2000,
weight: 0.75,
dense_vector: None,
sparse_vector: None,
last_write_seq: 0,
}
}
fn make_edge(id: u64, from: u64, to: u64, type_id: u32) -> EdgeRecord {
EdgeRecord {
id,
from,
to,
type_id,
props: BTreeMap::new(),
created_at: 2000,
updated_at: 2001,
weight: 1.0,
valid_from: 0,
valid_to: i64::MAX,
last_write_seq: 0,
}
}
fn write_and_open(mt: &Memtable) -> (tempfile::TempDir, SegmentReader) {
write_and_open_with_dense_config(mt, None)
}
fn write_and_open_with_dense_config(
mt: &Memtable,
dense_config: Option<&DenseVectorConfig>,
) -> (tempfile::TempDir, SegmentReader) {
let dir = tempfile::tempdir().unwrap();
let seg_dir = dir.path().join("seg_0001");
write_segment(&seg_dir, 1, mt, dense_config).unwrap();
let reader = SegmentReader::open(&seg_dir, 1, dense_config).unwrap();
(dir, reader)
}
fn write_legacy_prop_index(seg_dir: &Path, groups: &[(u32, u64, u64, Vec<u64>)]) {
use std::fs::File;
use std::io::{BufWriter, Write};
let mut groups = groups.to_vec();
groups.sort_unstable_by(|left, right| {
left.0
.cmp(&right.0)
.then_with(|| left.1.cmp(&right.1))
.then_with(|| left.2.cmp(&right.2))
});
let path = seg_dir.join("prop_index.dat");
let file = File::create(path).unwrap();
let mut writer = BufWriter::new(file);
writer
.write_all(&(groups.len() as u64).to_le_bytes())
.unwrap();
let data_start = 8 + groups.len() * PROP_INDEX_ENTRY_SIZE;
let mut data_offset = data_start as u64;
for (type_id, key_hash, value_hash, ids) in &groups {
writer.write_all(&type_id.to_le_bytes()).unwrap();
writer.write_all(&key_hash.to_le_bytes()).unwrap();
writer.write_all(&value_hash.to_le_bytes()).unwrap();
writer.write_all(&data_offset.to_le_bytes()).unwrap();
writer.write_all(&(ids.len() as u32).to_le_bytes()).unwrap();
data_offset += (ids.len() * 8) as u64;
}
for (_, _, _, ids) in &groups {
for id in ids {
writer.write_all(&id.to_le_bytes()).unwrap();
}
}
writer.flush().unwrap();
writer.get_ref().sync_all().unwrap();
}
fn write_and_open_with_legacy_prop_index(
mt: &Memtable,
groups: &[(u32, u64, u64, Vec<u64>)],
) -> (tempfile::TempDir, SegmentReader) {
let dir = tempfile::tempdir().unwrap();
let seg_dir = dir.path().join("seg_0001");
crate::segment_writer::write_segment_without_degree_sidecar_for_test(&seg_dir, 1, mt, None)
.unwrap();
write_legacy_prop_index(&seg_dir, groups);
let reader = SegmentReader::open(&seg_dir, 1, None).unwrap();
(dir, reader)
}
fn write_and_open_with_secondary_eq_sidecar(
mt: &Memtable,
entry: &SecondaryIndexManifestEntry,
) -> (tempfile::TempDir, SegmentReader) {
let dir = tempfile::tempdir().unwrap();
let seg_dir = dir.path().join("seg_0001");
let mt = mt.clone();
mt.register_secondary_index(entry);
crate::segment_writer::write_segment_without_degree_sidecar_with_secondary_indexes_for_test(
&seg_dir,
1,
&mt,
None,
std::slice::from_ref(entry),
)
.unwrap();
let reader = SegmentReader::open(&seg_dir, 1, None).unwrap();
(dir, reader)
}
fn write_and_open_with_secondary_range_sidecar(
mt: &Memtable,
entry: &SecondaryIndexManifestEntry,
) -> (tempfile::TempDir, SegmentReader) {
let dir = tempfile::tempdir().unwrap();
let seg_dir = dir.path().join("seg_0001");
let mt = mt.clone();
mt.register_secondary_index(entry);
crate::segment_writer::write_segment_without_degree_sidecar_with_secondary_indexes_for_test(
&seg_dir,
1,
&mt,
None,
std::slice::from_ref(entry),
)
.unwrap();
let reader = SegmentReader::open(&seg_dir, 1, None).unwrap();
(dir, reader)
}
fn write_sparse_segment(nodes: Vec<NodeRecord>) -> (tempfile::TempDir, std::path::PathBuf) {
let dir = tempfile::tempdir().unwrap();
let seg_dir = dir.path().join("seg_0001");
let mt = Memtable::new();
for (seq, node) in nodes.into_iter().enumerate() {
mt.apply_op(&WalOp::UpsertNode(node), seq as u64);
}
write_segment(&seg_dir, 1, &mt, None).unwrap();
(dir, seg_dir)
}
fn dense_config(dimension: u32) -> DenseVectorConfig {
DenseVectorConfig {
dimension,
metric: DenseMetric::Cosine,
hnsw: HnswConfig::default(),
}
}
fn build_u64_key_index(keys: &[u64], entry_size: usize, key_offset: usize) -> Vec<u8> {
let mut data = vec![0u8; 8 + keys.len() * entry_size];
data[0..8].copy_from_slice(&(keys.len() as u64).to_le_bytes());
for (i, key) in keys.iter().enumerate() {
let off = 8 + i * entry_size + key_offset;
data[off..off + 8].copy_from_slice(&key.to_le_bytes());
}
data
}
#[test]
fn test_batch_strategy_prefers_seek_for_tiny_key_count() {
let keys: Vec<u64> = (1..=10_000).collect();
let idx = build_u64_key_index(&keys, NODE_INDEX_ENTRY_SIZE, 0);
let strategy =
choose_batch_read_strategy(&idx, keys.len(), NODE_INDEX_ENTRY_SIZE, 0, 2, 500, 501)
.unwrap();
assert_eq!(strategy, BatchReadStrategy::SeekPerKey);
}
#[test]
fn test_batch_strategy_prefers_merge_for_dense_large_range() {
let keys: Vec<u64> = (1..=10_000).collect();
let idx = build_u64_key_index(&keys, NODE_INDEX_ENTRY_SIZE, 0);
let strategy = choose_batch_read_strategy(
&idx,
keys.len(),
NODE_INDEX_ENTRY_SIZE,
0,
256,
2_000,
2_255,
)
.unwrap();
assert_eq!(strategy, BatchReadStrategy::MergeWalk);
}
#[test]
fn test_batch_strategy_prefers_seek_for_sparse_range() {
let keys: Vec<u64> = (1..=10_000).collect();
let idx = build_u64_key_index(&keys, NODE_INDEX_ENTRY_SIZE, 0);
let strategy =
choose_batch_read_strategy(&idx, keys.len(), NODE_INDEX_ENTRY_SIZE, 0, 64, 100, 9_900)
.unwrap();
assert_eq!(strategy, BatchReadStrategy::SeekPerKey);
}
#[test]
fn test_get_node_found() {
let mt = Memtable::new();
mt.apply_op(&WalOp::UpsertNode(make_node(42, 1, "alice")), 0);
let (_dir, reader) = write_and_open(&mt);
let node = reader.get_node(42).unwrap().unwrap();
assert_eq!(node.id, 42);
assert_eq!(node.type_id, 1);
assert_eq!(node.key, "alice");
assert_eq!(node.created_at, 1000);
assert!((node.weight - 0.5).abs() < f32::EPSILON);
}
#[test]
fn test_get_node_not_found() {
let mt = Memtable::new();
mt.apply_op(&WalOp::UpsertNode(make_node(1, 1, "alice")), 0);
let (_dir, reader) = write_and_open(&mt);
assert!(reader.get_node(999).unwrap().is_none());
}
#[test]
fn test_get_node_with_properties() {
let mt = Memtable::new();
mt.apply_op(&WalOp::UpsertNode(make_node_with_props(1, 1, "alice")), 0);
let (_dir, reader) = write_and_open(&mt);
let node = reader.get_node(1).unwrap().unwrap();
assert_eq!(
node.props.get("name"),
Some(&PropValue::String("alice".to_string()))
);
if let Some(PropValue::Float(f)) = node.props.get("score") {
assert!((f - 0.95).abs() < f64::EPSILON);
} else {
panic!("expected Float property");
}
}
#[test]
fn test_get_node_with_vectors() {
let mt = Memtable::new();
let dense_config = dense_config(3);
let mut node = make_node(7, 1, "vector");
node.dense_vector = Some(vec![0.1, 0.2, 0.3]);
node.sparse_vector = Some(vec![(2, 1.5), (9, 0.25)]);
mt.apply_op(&WalOp::UpsertNode(node), 0);
let (_dir, reader) = write_and_open_with_dense_config(&mt, Some(&dense_config));
let node = reader.get_node(7).unwrap().unwrap();
assert_eq!(node.dense_vector, Some(vec![0.1, 0.2, 0.3]));
assert_eq!(node.sparse_vector, Some(vec![(2, 1.5), (9, 0.25)]));
}
#[test]
fn test_all_nodes_hydrates_mixed_vectors() {
let mt = Memtable::new();
let dense_config = dense_config(2);
let mut with_vectors = make_node(1, 1, "with_vectors");
with_vectors.dense_vector = Some(vec![0.5, 0.6]);
mt.apply_op(&WalOp::UpsertNode(with_vectors), 0);
mt.apply_op(&WalOp::UpsertNode(make_node(2, 1, "plain")), 0);
let (_dir, reader) = write_and_open_with_dense_config(&mt, Some(&dense_config));
let nodes = reader.all_nodes().unwrap();
assert_eq!(nodes.len(), 2);
assert_eq!(nodes[0].dense_vector, Some(vec![0.5, 0.6]));
assert!(nodes[0].sparse_vector.is_none());
assert!(nodes[1].dense_vector.is_none());
assert!(nodes[1].sparse_vector.is_none());
}
#[test]
fn test_get_node_tombstoned() {
let mt = Memtable::new();
mt.apply_op(&WalOp::UpsertNode(make_node(1, 1, "alice")), 0);
mt.apply_op(
&WalOp::DeleteNode {
id: 1,
deleted_at: 9999,
},
0,
);
let (_dir, reader) = write_and_open(&mt);
assert!(reader.get_node(1).unwrap().is_none());
assert!(reader.is_node_deleted(1));
}
#[test]
fn test_get_edge_found() {
let mt = Memtable::new();
mt.apply_op(&WalOp::UpsertEdge(make_edge(100, 1, 2, 10)), 0);
let (_dir, reader) = write_and_open(&mt);
let edge = reader.get_edge(100).unwrap().unwrap();
assert_eq!(edge.id, 100);
assert_eq!(edge.from, 1);
assert_eq!(edge.to, 2);
assert_eq!(edge.type_id, 10);
}
#[test]
fn test_get_edge_not_found() {
let mt = Memtable::new();
let (_dir, reader) = write_and_open(&mt);
assert!(reader.get_edge(1).unwrap().is_none());
}
#[test]
fn test_node_by_key_found() {
let mt = Memtable::new();
mt.apply_op(&WalOp::UpsertNode(make_node(1, 1, "alice")), 0);
mt.apply_op(&WalOp::UpsertNode(make_node(2, 1, "bob")), 0);
mt.apply_op(&WalOp::UpsertNode(make_node(3, 2, "alice")), 0);
let (_dir, reader) = write_and_open(&mt);
let node = reader.node_by_key(1, "alice").unwrap().unwrap();
assert_eq!(node.id, 1);
let node = reader.node_by_key(1, "bob").unwrap().unwrap();
assert_eq!(node.id, 2);
let node = reader.node_by_key(2, "alice").unwrap().unwrap();
assert_eq!(node.id, 3);
}
#[test]
fn test_node_by_key_not_found() {
let mt = Memtable::new();
mt.apply_op(&WalOp::UpsertNode(make_node(1, 1, "alice")), 0);
let (_dir, reader) = write_and_open(&mt);
assert!(reader.node_by_key(1, "bob").unwrap().is_none());
assert!(reader.node_by_key(2, "alice").unwrap().is_none());
}
#[test]
fn test_neighbors_outgoing() {
let mt = Memtable::new();
mt.apply_op(&WalOp::UpsertNode(make_node(1, 1, "a")), 0);
mt.apply_op(&WalOp::UpsertNode(make_node(2, 1, "b")), 0);
mt.apply_op(&WalOp::UpsertNode(make_node(3, 1, "c")), 0);
mt.apply_op(&WalOp::UpsertEdge(make_edge(1, 1, 2, 10)), 0);
mt.apply_op(&WalOp::UpsertEdge(make_edge(2, 1, 3, 10)), 0);
let (_dir, reader) = write_and_open(&mt);
let nbrs = reader.neighbors(1, Direction::Outgoing, None, 0).unwrap();
assert_eq!(nbrs.len(), 2);
let ids: NodeIdSet = nbrs.iter().map(|n| n.node_id).collect();
assert!(ids.contains(&2));
assert!(ids.contains(&3));
}
#[test]
fn test_neighbors_incoming() {
let mt = Memtable::new();
mt.apply_op(&WalOp::UpsertNode(make_node(1, 1, "a")), 0);
mt.apply_op(&WalOp::UpsertNode(make_node(2, 1, "b")), 0);
mt.apply_op(&WalOp::UpsertEdge(make_edge(1, 1, 2, 10)), 0);
let (_dir, reader) = write_and_open(&mt);
let nbrs = reader.neighbors(2, Direction::Incoming, None, 0).unwrap();
assert_eq!(nbrs.len(), 1);
assert_eq!(nbrs[0].node_id, 1);
}
#[test]
fn test_neighbors_with_type_filter() {
let mt = Memtable::new();
mt.apply_op(&WalOp::UpsertNode(make_node(1, 1, "a")), 0);
mt.apply_op(&WalOp::UpsertNode(make_node(2, 1, "b")), 0);
mt.apply_op(&WalOp::UpsertNode(make_node(3, 1, "c")), 0);
mt.apply_op(&WalOp::UpsertEdge(make_edge(1, 1, 2, 10)), 0);
mt.apply_op(&WalOp::UpsertEdge(make_edge(2, 1, 3, 20)), 0);
let (_dir, reader) = write_and_open(&mt);
let nbrs = reader
.neighbors(1, Direction::Outgoing, Some(&[10]), 0)
.unwrap();
assert_eq!(nbrs.len(), 1);
assert_eq!(nbrs[0].node_id, 2);
let nbrs = reader
.neighbors(1, Direction::Outgoing, Some(&[20]), 0)
.unwrap();
assert_eq!(nbrs.len(), 1);
assert_eq!(nbrs[0].node_id, 3);
let nbrs = reader
.neighbors(1, Direction::Outgoing, Some(&[99]), 0)
.unwrap();
assert!(nbrs.is_empty());
}
#[test]
fn test_neighbors_with_limit() {
let mt = Memtable::new();
mt.apply_op(&WalOp::UpsertNode(make_node(1, 1, "hub")), 0);
for i in 2..=6 {
mt.apply_op(&WalOp::UpsertNode(make_node(i, 1, &format!("n{}", i))), 0);
mt.apply_op(&WalOp::UpsertEdge(make_edge(i - 1, 1, i, 10)), 0);
}
let (_dir, reader) = write_and_open(&mt);
let nbrs = reader.neighbors(1, Direction::Outgoing, None, 3).unwrap();
assert_eq!(nbrs.len(), 3);
let all = reader.neighbors(1, Direction::Outgoing, None, 0).unwrap();
assert_eq!(all.len(), 5);
}
#[test]
fn test_neighbors_both_with_limit_preserves_self_loop_budget_semantics() {
let mt = Memtable::new();
mt.apply_op(&WalOp::UpsertNode(make_node(1, 1, "a")), 0);
mt.apply_op(&WalOp::UpsertNode(make_node(2, 1, "b")), 0);
mt.apply_op(&WalOp::UpsertNode(make_node(3, 1, "c")), 0);
mt.apply_op(&WalOp::UpsertEdge(make_edge(1, 1, 1, 10)), 0); mt.apply_op(&WalOp::UpsertEdge(make_edge(2, 2, 1, 10)), 0); mt.apply_op(&WalOp::UpsertEdge(make_edge(3, 3, 1, 10)), 0);
let (_dir, reader) = write_and_open(&mt);
let both = reader.neighbors(1, Direction::Both, None, 2).unwrap();
assert_eq!(both.len(), 1);
assert_eq!(both[0].edge_id, 1);
}
#[test]
fn test_neighbors_no_adjacency() {
let mt = Memtable::new();
mt.apply_op(&WalOp::UpsertNode(make_node(1, 1, "lonely")), 0);
let (_dir, reader) = write_and_open(&mt);
let nbrs = reader.neighbors(1, Direction::Outgoing, None, 0).unwrap();
assert!(nbrs.is_empty());
}
#[test]
fn test_for_each_adj_posting_breaks_early() {
let mt = Memtable::new();
for id in 1..=4 {
mt.apply_op(&WalOp::UpsertNode(make_node(id, 1, &format!("n{}", id))), 0);
}
mt.apply_op(&WalOp::UpsertEdge(make_edge(1, 1, 2, 10)), 0);
mt.apply_op(&WalOp::UpsertEdge(make_edge(2, 1, 3, 10)), 0);
mt.apply_op(&WalOp::UpsertEdge(make_edge(3, 1, 4, 10)), 0);
let (_dir, reader) = write_and_open(&mt);
let mut seen = 0usize;
let flow = reader
.for_each_adj_posting(
1,
Direction::Outgoing,
None,
&mut |_edge_id, _neighbor_id, _weight, _valid_from, _valid_to| {
seen += 1;
ControlFlow::Break(())
},
)
.unwrap();
assert!(matches!(flow, ControlFlow::Break(())));
assert_eq!(seen, 1);
}
#[test]
fn test_for_each_adj_posting_batch_breaks_early() {
let mt = Memtable::new();
for id in 1..=4 {
mt.apply_op(&WalOp::UpsertNode(make_node(id, 1, &format!("n{}", id))), 0);
}
mt.apply_op(&WalOp::UpsertEdge(make_edge(1, 1, 2, 10)), 0);
mt.apply_op(&WalOp::UpsertEdge(make_edge(2, 1, 3, 10)), 0);
mt.apply_op(&WalOp::UpsertEdge(make_edge(3, 1, 4, 10)), 0);
let (_dir, reader) = write_and_open(&mt);
let mut seen = 0usize;
let flow = reader
.for_each_adj_posting_batch(
&[1],
Direction::Outgoing,
None,
&mut |_node_id, _edge_id, _neighbor_id, _weight, _valid_from, _valid_to| {
seen += 1;
ControlFlow::Break(())
},
)
.unwrap();
assert!(matches!(flow, ControlFlow::Break(())));
assert_eq!(seen, 1);
}
#[test]
fn test_empty_segment_reader() {
let mt = Memtable::new();
let (_dir, reader) = write_and_open(&mt);
assert_eq!(reader.node_count(), 0);
assert_eq!(reader.edge_count(), 0);
assert!(reader.get_node(1).unwrap().is_none());
assert!(reader.get_edge(1).unwrap().is_none());
assert!(reader
.neighbors(1, Direction::Outgoing, None, 0)
.unwrap()
.is_empty());
}
#[test]
fn test_binary_search_many_nodes() {
let mt = Memtable::new();
for i in 1..=100 {
mt.apply_op(&WalOp::UpsertNode(make_node(i, 1, &format!("n{}", i))), 0);
}
let (_dir, reader) = write_and_open(&mt);
for i in 1..=100 {
let node = reader.get_node(i).unwrap().unwrap();
assert_eq!(node.id, i);
}
assert!(reader.get_node(0).unwrap().is_none());
assert!(reader.get_node(101).unwrap().is_none());
}
#[test]
fn test_binary_search_key_index_many() {
let mt = Memtable::new();
for i in 1..=50 {
mt.apply_op(
&WalOp::UpsertNode(make_node(i, (i % 3) as u32 + 1, &format!("key_{:04}", i))),
0,
);
}
let (_dir, reader) = write_and_open(&mt);
for i in 1..=50 {
let type_id = (i % 3) as u32 + 1;
let key = format!("key_{:04}", i);
let node = reader.node_by_key(type_id, &key).unwrap().unwrap();
assert_eq!(node.id, i);
}
}
#[test]
fn test_full_segment_roundtrip() {
let mt = Memtable::new();
for i in 1..=5 {
mt.apply_op(
&WalOp::UpsertNode(make_node_with_props(i, 1, &format!("node_{}", i))),
0,
);
}
mt.apply_op(&WalOp::UpsertEdge(make_edge(1, 1, 2, 10)), 0);
mt.apply_op(&WalOp::UpsertEdge(make_edge(2, 2, 3, 10)), 0);
mt.apply_op(&WalOp::UpsertEdge(make_edge(3, 1, 3, 20)), 0);
mt.apply_op(&WalOp::UpsertEdge(make_edge(4, 4, 5, 10)), 0);
mt.apply_op(
&WalOp::DeleteNode {
id: 99,
deleted_at: 9999,
},
0,
);
mt.apply_op(
&WalOp::DeleteEdge {
id: 99,
deleted_at: 9999,
},
0,
);
let (_dir, reader) = write_and_open(&mt);
assert_eq!(reader.node_count(), 5);
for i in 1..=5 {
let node = reader.get_node(i).unwrap().unwrap();
assert_eq!(node.key, format!("node_{}", i));
assert_eq!(
node.props.get("name"),
Some(&PropValue::String(format!("node_{}", i)))
);
}
assert_eq!(reader.edge_count(), 4);
let e1 = reader.get_edge(1).unwrap().unwrap();
assert_eq!(e1.from, 1);
assert_eq!(e1.to, 2);
let n = reader.node_by_key(1, "node_3").unwrap().unwrap();
assert_eq!(n.id, 3);
let out1 = reader.neighbors(1, Direction::Outgoing, None, 0).unwrap();
assert_eq!(out1.len(), 2); let ids: NodeIdSet = out1.iter().map(|n| n.node_id).collect();
assert!(ids.contains(&2));
assert!(ids.contains(&3));
let out1_t10 = reader
.neighbors(1, Direction::Outgoing, Some(&[10]), 0)
.unwrap();
assert_eq!(out1_t10.len(), 1);
assert_eq!(out1_t10[0].node_id, 2);
assert!(reader.is_node_deleted(99));
assert!(reader.is_edge_deleted(99));
}
#[test]
fn test_legacy_prop_index_roundtrip() {
use crate::types::{hash_prop_key, hash_prop_value};
let mt = Memtable::new();
let mut props1 = BTreeMap::new();
props1.insert("color".to_string(), PropValue::String("red".to_string()));
mt.apply_op(
&WalOp::UpsertNode(NodeRecord {
id: 1,
type_id: 1,
key: "apple".to_string(),
props: props1,
created_at: 1000,
updated_at: 1001,
weight: 0.5,
dense_vector: None,
sparse_vector: None,
last_write_seq: 0,
}),
0,
);
let mut props2 = BTreeMap::new();
props2.insert("color".to_string(), PropValue::String("red".to_string()));
mt.apply_op(
&WalOp::UpsertNode(NodeRecord {
id: 2,
type_id: 1,
key: "cherry".to_string(),
props: props2,
created_at: 1000,
updated_at: 1001,
weight: 0.5,
dense_vector: None,
sparse_vector: None,
last_write_seq: 0,
}),
0,
);
let mut props3 = BTreeMap::new();
props3.insert("color".to_string(), PropValue::String("green".to_string()));
mt.apply_op(
&WalOp::UpsertNode(NodeRecord {
id: 3,
type_id: 1,
key: "lime".to_string(),
props: props3,
created_at: 1000,
updated_at: 1001,
weight: 0.5,
dense_vector: None,
sparse_vector: None,
last_write_seq: 0,
}),
0,
);
let key_hash = hash_prop_key("color");
let val_hash = hash_prop_value(&PropValue::String("red".to_string()));
let val_hash_green = hash_prop_value(&PropValue::String("green".to_string()));
let (_dir, reader) = write_and_open_with_legacy_prop_index(
&mt,
&[
(1, key_hash, val_hash, vec![1, 2]),
(1, key_hash, val_hash_green, vec![3]),
],
);
let mut reds = reader
.find_nodes_by_prop_hash(1, key_hash, val_hash)
.unwrap();
reds.sort();
assert_eq!(reds, vec![1, 2]);
let greens = reader
.find_nodes_by_prop_hash(1, key_hash, val_hash_green)
.unwrap();
assert_eq!(greens, vec![3]);
let val_hash_blue = hash_prop_value(&PropValue::String("blue".to_string()));
assert!(reader
.find_nodes_by_prop_hash(1, key_hash, val_hash_blue)
.unwrap()
.is_empty());
assert!(reader
.find_nodes_by_prop_hash(99, key_hash, val_hash)
.unwrap()
.is_empty());
}
#[test]
fn test_legacy_prop_index_excludes_tombstoned() {
use crate::types::{hash_prop_key, hash_prop_value};
let mt = Memtable::new();
let mut props = BTreeMap::new();
props.insert("tag".to_string(), PropValue::String("x".to_string()));
mt.apply_op(
&WalOp::UpsertNode(NodeRecord {
id: 1,
type_id: 1,
key: "a".to_string(),
props: props.clone(),
created_at: 1000,
updated_at: 1001,
weight: 0.5,
dense_vector: None,
sparse_vector: None,
last_write_seq: 0,
}),
0,
);
mt.apply_op(
&WalOp::UpsertNode(NodeRecord {
id: 2,
type_id: 1,
key: "b".to_string(),
props,
created_at: 1000,
updated_at: 1001,
weight: 0.5,
dense_vector: None,
sparse_vector: None,
last_write_seq: 0,
}),
0,
);
mt.apply_op(
&WalOp::DeleteNode {
id: 2,
deleted_at: 9999,
},
0,
);
let key_hash = hash_prop_key("tag");
let val_hash = hash_prop_value(&PropValue::String("x".to_string()));
let (_dir, reader) =
write_and_open_with_legacy_prop_index(&mt, &[(1, key_hash, val_hash, vec![1, 2])]);
let results = reader
.find_nodes_by_prop_hash(1, key_hash, val_hash)
.unwrap();
assert_eq!(results, vec![1]);
}
#[test]
fn test_secondary_eq_sidecar_roundtrip() {
use crate::types::hash_prop_value;
let mt = Memtable::new();
let mut props1 = BTreeMap::new();
props1.insert("color".to_string(), PropValue::String("red".to_string()));
mt.apply_op(
&WalOp::UpsertNode(NodeRecord {
id: 1,
type_id: 1,
key: "apple".to_string(),
props: props1,
created_at: 1000,
updated_at: 1001,
weight: 0.5,
dense_vector: None,
sparse_vector: None,
last_write_seq: 0,
}),
0,
);
let mut props2 = BTreeMap::new();
props2.insert("color".to_string(), PropValue::String("red".to_string()));
mt.apply_op(
&WalOp::UpsertNode(NodeRecord {
id: 2,
type_id: 1,
key: "cherry".to_string(),
props: props2,
created_at: 1000,
updated_at: 1001,
weight: 0.5,
dense_vector: None,
sparse_vector: None,
last_write_seq: 0,
}),
0,
);
let mut props3 = BTreeMap::new();
props3.insert("color".to_string(), PropValue::String("green".to_string()));
mt.apply_op(
&WalOp::UpsertNode(NodeRecord {
id: 3,
type_id: 1,
key: "lime".to_string(),
props: props3,
created_at: 1000,
updated_at: 1001,
weight: 0.5,
dense_vector: None,
sparse_vector: None,
last_write_seq: 0,
}),
0,
);
let entry = SecondaryIndexManifestEntry {
index_id: 41,
target: SecondaryIndexTarget::NodeProperty {
type_id: 1,
prop_key: "color".to_string(),
},
kind: SecondaryIndexKind::Equality,
state: SecondaryIndexState::Building,
last_error: None,
};
let (_dir, reader) = write_and_open_with_secondary_eq_sidecar(&mt, &entry);
let red_hash = hash_prop_value(&PropValue::String("red".to_string()));
let green_hash = hash_prop_value(&PropValue::String("green".to_string()));
let mut reds = reader
.find_nodes_by_secondary_eq_index(entry.index_id, red_hash)
.unwrap();
reds.sort_unstable();
assert_eq!(reds, vec![1, 2]);
assert_eq!(
reader
.find_nodes_by_secondary_eq_index(entry.index_id, green_hash)
.unwrap(),
vec![3]
);
}
#[test]
fn test_secondary_eq_sidecar_cache_reloads_after_validation_failure_and_repair() {
use crate::types::hash_prop_value;
let mt = Memtable::new();
let mut props = BTreeMap::new();
props.insert("color".to_string(), PropValue::String("red".to_string()));
mt.apply_op(
&WalOp::UpsertNode(NodeRecord {
id: 1,
type_id: 1,
key: "apple".to_string(),
props,
created_at: 1000,
updated_at: 1001,
weight: 0.5,
dense_vector: None,
sparse_vector: None,
last_write_seq: 0,
}),
0,
);
let entry = SecondaryIndexManifestEntry {
index_id: 51,
target: SecondaryIndexTarget::NodeProperty {
type_id: 1,
prop_key: "color".to_string(),
},
kind: SecondaryIndexKind::Equality,
state: SecondaryIndexState::Building,
last_error: None,
};
let (dir, reader) = write_and_open_with_secondary_eq_sidecar(&mt, &entry);
let seg_dir = dir.path().join("seg_0001");
let sidecar_path = seg_dir
.join("secondary_indexes")
.join(format!("node_prop_eq_{}.dat", entry.index_id));
let red_hash = hash_prop_value(&PropValue::String("red".to_string()));
let corrupt_path = seg_dir.join("secondary_indexes").join(".corrupt_eq.dat");
std::fs::write(&corrupt_path, [1u8, 2, 3]).unwrap();
std::fs::rename(&corrupt_path, &sidecar_path).unwrap();
assert!(reader
.validate_secondary_eq_sidecar(entry.index_id)
.is_err());
let repaired_path = seg_dir.join("secondary_indexes").join(".repaired_eq.dat");
let mut repaired_groups = BTreeMap::new();
repaired_groups.insert(red_hash, vec![1]);
crate::segment_writer::write_node_prop_eq_sidecar_to_path(&repaired_path, &repaired_groups)
.unwrap();
std::fs::rename(&repaired_path, &sidecar_path).unwrap();
assert_eq!(
reader
.find_nodes_by_secondary_eq_index(entry.index_id, red_hash)
.unwrap(),
vec![1]
);
}
#[test]
fn test_secondary_eq_sidecar_lookup_uses_validated_cache() {
use crate::types::hash_prop_value;
let mt = Memtable::new();
let mut props = BTreeMap::new();
props.insert("color".to_string(), PropValue::String("red".to_string()));
mt.apply_op(
&WalOp::UpsertNode(NodeRecord {
id: 1,
type_id: 1,
key: "apple".to_string(),
props,
created_at: 1000,
updated_at: 1001,
weight: 0.5,
dense_vector: None,
sparse_vector: None,
last_write_seq: 0,
}),
0,
);
let entry = SecondaryIndexManifestEntry {
index_id: 52,
target: SecondaryIndexTarget::NodeProperty {
type_id: 1,
prop_key: "color".to_string(),
},
kind: SecondaryIndexKind::Equality,
state: SecondaryIndexState::Building,
last_error: None,
};
let (dir, reader) = write_and_open_with_secondary_eq_sidecar(&mt, &entry);
let seg_dir = dir.path().join("seg_0001");
let sidecar_path = seg_dir
.join("secondary_indexes")
.join(format!("node_prop_eq_{}.dat", entry.index_id));
let red_hash = hash_prop_value(&PropValue::String("red".to_string()));
assert_eq!(
reader
.find_nodes_by_secondary_eq_index(entry.index_id, red_hash)
.unwrap(),
vec![1]
);
let corrupt_path = seg_dir.join("secondary_indexes").join(".corrupt_eq.dat");
std::fs::write(&corrupt_path, [1u8, 2, 3]).unwrap();
std::fs::rename(&corrupt_path, &sidecar_path).unwrap();
assert_eq!(
reader
.find_nodes_by_secondary_eq_index(entry.index_id, red_hash)
.unwrap(),
vec![1]
);
}
#[test]
fn test_validate_secondary_eq_sidecar_rejects_unsorted_node_ids() {
let mut data = Vec::new();
data.extend_from_slice(&1u64.to_le_bytes());
data.extend_from_slice(&7u64.to_le_bytes());
data.extend_from_slice(&28u64.to_le_bytes());
data.extend_from_slice(&2u32.to_le_bytes());
data.extend_from_slice(&2u64.to_le_bytes());
data.extend_from_slice(&1u64.to_le_bytes());
match validate_secondary_eq_sidecar_data(&data) {
Err(EngineError::CorruptRecord(message)) => {
assert!(message.contains("node IDs are not strictly increasing"));
}
other => panic!(
"expected corrupt secondary equality sidecar, got {:?}",
other
),
}
}
#[test]
fn test_validate_secondary_eq_sidecar_rejects_missing_header() {
match validate_secondary_eq_sidecar_data(&[1u8, 2, 3]) {
Err(EngineError::CorruptRecord(message)) => {
assert!(message.contains("missing header"));
}
other => panic!(
"expected corrupt secondary equality sidecar, got {:?}",
other
),
}
}
#[test]
fn test_validate_secondary_eq_sidecar_rejects_index_length_past_eof() {
let mut data = Vec::new();
data.extend_from_slice(&2u64.to_le_bytes());
data.extend_from_slice(&7u64.to_le_bytes());
data.extend_from_slice(&28u64.to_le_bytes());
data.extend_from_slice(&1u32.to_le_bytes());
match validate_secondary_eq_sidecar_data(&data) {
Err(EngineError::CorruptRecord(message)) => {
assert!(message.contains("index length"));
}
other => panic!(
"expected corrupt secondary equality sidecar, got {:?}",
other
),
}
}
#[test]
fn test_validate_secondary_eq_sidecar_rejects_non_increasing_value_hashes() {
let mut data = Vec::new();
data.extend_from_slice(&2u64.to_le_bytes());
data.extend_from_slice(&7u64.to_le_bytes());
data.extend_from_slice(&48u64.to_le_bytes());
data.extend_from_slice(&1u32.to_le_bytes());
data.extend_from_slice(&7u64.to_le_bytes());
data.extend_from_slice(&56u64.to_le_bytes());
data.extend_from_slice(&1u32.to_le_bytes());
data.extend_from_slice(&1u64.to_le_bytes());
data.extend_from_slice(&2u64.to_le_bytes());
match validate_secondary_eq_sidecar_data(&data) {
Err(EngineError::CorruptRecord(message)) => {
assert!(message.contains("value hashes are not strictly increasing"));
}
other => panic!(
"expected corrupt secondary equality sidecar, got {:?}",
other
),
}
}
#[test]
fn test_validate_secondary_eq_sidecar_rejects_group_past_eof() {
let mut data = Vec::new();
data.extend_from_slice(&1u64.to_le_bytes());
data.extend_from_slice(&7u64.to_le_bytes());
data.extend_from_slice(&28u64.to_le_bytes());
data.extend_from_slice(&2u32.to_le_bytes());
data.extend_from_slice(&1u64.to_le_bytes());
match validate_secondary_eq_sidecar_data(&data) {
Err(EngineError::CorruptRecord(message)) => {
assert!(message.contains("exceeds file length"));
}
other => panic!(
"expected corrupt secondary equality sidecar, got {:?}",
other
),
}
}
#[test]
fn test_validate_secondary_eq_sidecar_rejects_overlapping_group_ranges() {
let mut data = Vec::new();
data.extend_from_slice(&2u64.to_le_bytes());
data.extend_from_slice(&7u64.to_le_bytes());
data.extend_from_slice(&48u64.to_le_bytes());
data.extend_from_slice(&2u32.to_le_bytes());
data.extend_from_slice(&8u64.to_le_bytes());
data.extend_from_slice(&56u64.to_le_bytes());
data.extend_from_slice(&1u32.to_le_bytes());
data.extend_from_slice(&1u64.to_le_bytes());
data.extend_from_slice(&2u64.to_le_bytes());
data.extend_from_slice(&3u64.to_le_bytes());
match validate_secondary_eq_sidecar_data(&data) {
Err(EngineError::CorruptRecord(message)) => {
assert!(message.contains("overlaps a previous group"));
}
other => panic!(
"expected corrupt secondary equality sidecar, got {:?}",
other
),
}
}
#[test]
fn test_secondary_range_sidecar_cache_reloads_after_validation_failure_and_repair() {
let mt = Memtable::new();
let mut props = BTreeMap::new();
props.insert("score".to_string(), PropValue::Int(10));
mt.apply_op(
&WalOp::UpsertNode(NodeRecord {
id: 1,
type_id: 1,
key: "apple".to_string(),
props,
created_at: 1000,
updated_at: 1001,
weight: 0.5,
dense_vector: None,
sparse_vector: None,
last_write_seq: 0,
}),
0,
);
let entry = SecondaryIndexManifestEntry {
index_id: 61,
target: SecondaryIndexTarget::NodeProperty {
type_id: 1,
prop_key: "score".to_string(),
},
kind: SecondaryIndexKind::Range {
domain: SecondaryIndexRangeDomain::Int,
},
state: SecondaryIndexState::Building,
last_error: None,
};
let (dir, reader) = write_and_open_with_secondary_range_sidecar(&mt, &entry);
let seg_dir = dir.path().join("seg_0001");
let sidecar_path = seg_dir
.join("secondary_indexes")
.join(format!("node_prop_range_{}.dat", entry.index_id));
let corrupt_path = seg_dir.join("secondary_indexes").join(".corrupt_range.dat");
std::fs::write(&corrupt_path, [1u8, 2, 3]).unwrap();
std::fs::rename(&corrupt_path, &sidecar_path).unwrap();
assert!(reader
.validate_secondary_range_sidecar(entry.index_id)
.is_err());
let repaired_path = seg_dir
.join("secondary_indexes")
.join(".repaired_range.dat");
crate::segment_writer::write_node_prop_range_sidecar_to_path(
&repaired_path,
&[(10u64 ^ (1u64 << 63), 1)],
)
.unwrap();
std::fs::rename(&repaired_path, &sidecar_path).unwrap();
assert_eq!(
reader
.find_nodes_by_secondary_range_index_if_present(
entry.index_id,
Some((10u64 ^ (1u64 << 63), true)),
Some((10u64 ^ (1u64 << 63), true)),
None,
)
.unwrap(),
Some(vec![(10u64 ^ (1u64 << 63), 1)])
);
}
#[test]
fn test_validate_secondary_range_sidecar_rejects_missing_header() {
match validate_secondary_range_sidecar_data(&[1u8, 2, 3]) {
Err(EngineError::CorruptRecord(message)) => {
assert!(message.contains("missing header"));
}
other => panic!("expected corrupt secondary range sidecar, got {:?}", other),
}
}
#[test]
fn test_validate_secondary_range_sidecar_rejects_length_mismatch() {
let mut data = Vec::new();
data.extend_from_slice(&1u64.to_le_bytes());
data.extend_from_slice(&(10u64 ^ (1u64 << 63)).to_le_bytes());
data.extend_from_slice(&1u64.to_le_bytes());
data.push(0xFF);
match validate_secondary_range_sidecar_data(&data) {
Err(EngineError::CorruptRecord(message)) => {
assert!(message.contains("does not match expected fixed-width length"));
}
other => panic!("expected corrupt secondary range sidecar, got {:?}", other),
}
}
#[test]
fn test_validate_secondary_range_sidecar_rejects_unsorted_entries() {
let mut data = Vec::new();
data.extend_from_slice(&2u64.to_le_bytes());
data.extend_from_slice(&(11u64 ^ (1u64 << 63)).to_le_bytes());
data.extend_from_slice(&2u64.to_le_bytes());
data.extend_from_slice(&(10u64 ^ (1u64 << 63)).to_le_bytes());
data.extend_from_slice(&1u64.to_le_bytes());
match validate_secondary_range_sidecar_data(&data) {
Err(EngineError::CorruptRecord(message)) => {
assert!(message.contains("not strictly increasing"));
}
other => panic!("expected corrupt secondary range sidecar, got {:?}", other),
}
}
#[test]
fn test_neighbor_weight_preserved_in_segment() {
let mt = Memtable::new();
mt.apply_op(&WalOp::UpsertNode(make_node(1, 1, "a")), 0);
mt.apply_op(&WalOp::UpsertNode(make_node(2, 1, "b")), 0);
mt.apply_op(&WalOp::UpsertNode(make_node(3, 1, "c")), 0);
mt.apply_op(
&WalOp::UpsertEdge(EdgeRecord {
id: 10,
from: 1,
to: 2,
type_id: 5,
props: BTreeMap::new(),
created_at: 100,
updated_at: 100,
weight: 0.75,
valid_from: 0,
valid_to: i64::MAX,
last_write_seq: 0,
}),
0,
);
mt.apply_op(
&WalOp::UpsertEdge(EdgeRecord {
id: 11,
from: 1,
to: 3,
type_id: 5,
props: BTreeMap::new(),
created_at: 100,
updated_at: 100,
weight: 0.25,
valid_from: 0,
valid_to: i64::MAX,
last_write_seq: 0,
}),
0,
);
let (_dir, reader) = write_and_open(&mt);
let nbrs = reader.neighbors(1, Direction::Outgoing, None, 0).unwrap();
assert_eq!(nbrs.len(), 2);
for n in &nbrs {
if n.edge_id == 10 {
assert!(
(n.weight - 0.75).abs() < f32::EPSILON,
"edge 10 weight: {}",
n.weight
);
} else if n.edge_id == 11 {
assert!(
(n.weight - 0.25).abs() < f32::EPSILON,
"edge 11 weight: {}",
n.weight
);
} else {
panic!("unexpected edge_id: {}", n.edge_id);
}
}
}
#[test]
fn test_edge_triple_index_roundtrip() {
let mt = Memtable::new();
mt.apply_op(&WalOp::UpsertNode(make_node(1, 1, "a")), 0);
mt.apply_op(&WalOp::UpsertNode(make_node(2, 1, "b")), 0);
mt.apply_op(&WalOp::UpsertNode(make_node(3, 1, "c")), 0);
mt.apply_op(&WalOp::UpsertEdge(make_edge(100, 1, 2, 10)), 0);
mt.apply_op(&WalOp::UpsertEdge(make_edge(101, 1, 3, 10)), 0);
mt.apply_op(&WalOp::UpsertEdge(make_edge(102, 2, 3, 20)), 0);
let (_dir, reader) = write_and_open(&mt);
let e = reader.edge_by_triple(1, 2, 10).unwrap().unwrap();
assert_eq!(e.id, 100);
assert_eq!(e.from, 1);
assert_eq!(e.to, 2);
let e = reader.edge_by_triple(1, 3, 10).unwrap().unwrap();
assert_eq!(e.id, 101);
let e = reader.edge_by_triple(2, 3, 20).unwrap().unwrap();
assert_eq!(e.id, 102);
assert!(reader.edge_by_triple(1, 2, 20).unwrap().is_none()); assert!(reader.edge_by_triple(2, 1, 10).unwrap().is_none()); assert!(reader.edge_by_triple(3, 1, 10).unwrap().is_none()); }
#[test]
fn test_edge_triple_index_excludes_tombstoned() {
let mt = Memtable::new();
mt.apply_op(&WalOp::UpsertNode(make_node(1, 1, "a")), 0);
mt.apply_op(&WalOp::UpsertNode(make_node(2, 1, "b")), 0);
mt.apply_op(&WalOp::UpsertEdge(make_edge(100, 1, 2, 10)), 0);
mt.apply_op(
&WalOp::DeleteEdge {
id: 100,
deleted_at: 9999,
},
0,
);
let (_dir, reader) = write_and_open(&mt);
assert!(reader.edge_by_triple(1, 2, 10).unwrap().is_none());
}
#[test]
fn test_truncated_nodes_dat_returns_error() {
let dir = tempfile::tempdir().unwrap();
let seg_dir = dir.path().join("seg_0001");
std::fs::create_dir_all(&seg_dir).unwrap();
write_format_ver(&seg_dir);
let mut data = Vec::new();
data.extend_from_slice(&1u64.to_le_bytes()); std::fs::write(seg_dir.join("nodes.dat"), &data).unwrap();
for name in &[
"edges.dat",
"adj_out.idx",
"adj_out.dat",
"adj_in.idx",
"adj_in.dat",
"key_index.dat",
"tombstones.dat",
"node_meta.dat",
"edge_meta.dat",
"timestamp_index.dat",
] {
std::fs::write(seg_dir.join(name), []).unwrap();
}
let reader = SegmentReader::open(&seg_dir, 1, None).unwrap();
let result = reader.get_node(42);
assert!(
result.is_err(),
"truncated segment should return error, not panic"
);
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("exceeds data length"),
"error should describe bounds issue: {}",
err_msg
);
}
#[test]
fn test_truncated_tombstones_returns_error() {
let dir = tempfile::tempdir().unwrap();
let seg_dir = dir.path().join("seg_0001");
std::fs::create_dir_all(&seg_dir).unwrap();
write_format_ver(&seg_dir);
for name in &[
"nodes.dat",
"edges.dat",
"adj_out.idx",
"adj_out.dat",
"adj_in.idx",
"adj_in.dat",
"key_index.dat",
"node_meta.dat",
"edge_meta.dat",
"timestamp_index.dat",
] {
std::fs::write(seg_dir.join(name), []).unwrap();
}
let mut data = Vec::new();
data.extend_from_slice(&5u64.to_le_bytes()); std::fs::write(seg_dir.join("tombstones.dat"), &data).unwrap();
let result = SegmentReader::open(&seg_dir, 1, None);
assert!(
result.is_err(),
"truncated tombstones should return error, not panic"
);
}
#[test]
fn test_decode_node_at_truncated_returns_error() {
let mut data = Vec::new();
data.extend_from_slice(&1u32.to_le_bytes()); let result = decode_node_at(&data, 0, 42);
assert!(result.is_err());
}
#[test]
fn test_decode_edge_at_truncated_returns_error() {
let mut data = Vec::new();
data.extend_from_slice(&1u64.to_le_bytes()); let result = decode_edge_at(&data, 0, 100);
assert!(result.is_err());
}
#[test]
fn test_format_version_bad_magic_rejected() {
let dir = tempfile::tempdir().unwrap();
let seg_dir = dir.path().join("seg_test");
std::fs::create_dir_all(&seg_dir).unwrap();
let mut bad = Vec::new();
bad.extend_from_slice(b"BAAD");
bad.extend_from_slice(&1u32.to_le_bytes());
std::fs::write(seg_dir.join("format.ver"), &bad).unwrap();
let err = read_format_version(&seg_dir).unwrap_err();
let msg = err.to_string();
assert!(msg.contains("invalid magic"), "got: {}", msg);
}
#[test]
fn test_format_version_bad_size_rejected() {
let dir = tempfile::tempdir().unwrap();
let seg_dir = dir.path().join("seg_test");
std::fs::create_dir_all(&seg_dir).unwrap();
std::fs::write(seg_dir.join("format.ver"), b"short").unwrap();
let err = read_format_version(&seg_dir).unwrap_err();
let msg = err.to_string();
assert!(msg.contains("invalid size"), "got: {}", msg);
}
#[test]
fn test_format_version_future_version_rejected() {
use crate::segment_writer::{SEGMENT_FORMAT_VERSION, SEGMENT_MAGIC};
let dir = tempfile::tempdir().unwrap();
let seg_dir = dir.path().join("seg_test");
std::fs::create_dir_all(&seg_dir).unwrap();
let mut data = Vec::new();
data.extend_from_slice(&SEGMENT_MAGIC);
data.extend_from_slice(&(SEGMENT_FORMAT_VERSION + 1).to_le_bytes());
std::fs::write(seg_dir.join("format.ver"), &data).unwrap();
let err = read_format_version(&seg_dir).unwrap_err();
let msg = err.to_string();
assert!(msg.contains("newer than supported"), "got: {}", msg);
}
#[test]
fn test_format_version_absent_is_rejected() {
let dir = tempfile::tempdir().unwrap();
let seg_dir = dir.path().join("seg_test");
std::fs::create_dir_all(&seg_dir).unwrap();
let err = read_format_version(&seg_dir).unwrap_err();
assert!(err.to_string().contains("too old"), "got: {}", err);
}
#[test]
fn test_format_version_v4_rejected() {
use crate::segment_writer::SEGMENT_MAGIC;
let dir = tempfile::tempdir().unwrap();
let seg_dir = dir.path().join("seg_test");
std::fs::create_dir_all(&seg_dir).unwrap();
let mut data = Vec::new();
data.extend_from_slice(&SEGMENT_MAGIC);
data.extend_from_slice(&4u32.to_le_bytes());
std::fs::write(seg_dir.join("format.ver"), &data).unwrap();
let err = read_format_version(&seg_dir).unwrap_err();
assert!(err.to_string().contains("too old"), "got: {}", err);
}
#[test]
fn test_open_rejects_orphan_vector_blob() {
let dir = tempfile::tempdir().unwrap();
let seg_dir = dir.path().join("seg_0001");
let mt = Memtable::new();
mt.apply_op(&WalOp::UpsertNode(make_node(1, 1, "plain")), 0);
write_segment(&seg_dir, 1, &mt, None).unwrap();
std::fs::write(seg_dir.join(NODE_DENSE_VECTOR_BLOB_FILENAME), [0u8; 4]).unwrap();
let err = SegmentReader::open(&seg_dir, 1, None).err().unwrap();
assert!(
err.to_string()
.contains("vector blobs without node vector metadata"),
"got: {}",
err
);
}
#[test]
fn test_open_rejects_vector_metadata_count_mismatch() {
let dir = tempfile::tempdir().unwrap();
let seg_dir = dir.path().join("seg_0001");
let mt = Memtable::new();
let dense_config = DenseVectorConfig {
dimension: 2,
metric: DenseMetric::Cosine,
hnsw: HnswConfig::default(),
};
let mut node = make_node(1, 1, "vector");
node.dense_vector = Some(vec![0.1, 0.2]);
mt.apply_op(&WalOp::UpsertNode(node), 0);
write_segment(&seg_dir, 1, &mt, Some(&dense_config)).unwrap();
let mut meta = std::fs::read(seg_dir.join(NODE_VECTOR_META_FILENAME)).unwrap();
meta[0..8].copy_from_slice(&2u64.to_le_bytes());
std::fs::write(seg_dir.join(NODE_VECTOR_META_FILENAME), meta).unwrap();
let err = SegmentReader::open(&seg_dir, 1, Some(&dense_config))
.err()
.unwrap();
assert!(err
.to_string()
.contains("does not match node metadata count"));
}
#[test]
fn test_open_rejects_vector_blob_with_trailing_bytes() {
let dir = tempfile::tempdir().unwrap();
let seg_dir = dir.path().join("seg_0001");
let mt = Memtable::new();
let dense_config = dense_config(2);
let mut node = make_node(1, 1, "vector");
node.dense_vector = Some(vec![0.1, 0.2]);
mt.apply_op(&WalOp::UpsertNode(node), 0);
write_segment(&seg_dir, 1, &mt, Some(&dense_config)).unwrap();
let mut dense_blob = std::fs::read(seg_dir.join(NODE_DENSE_VECTOR_BLOB_FILENAME)).unwrap();
dense_blob.extend_from_slice(&0.9f32.to_le_bytes());
std::fs::write(seg_dir.join(NODE_DENSE_VECTOR_BLOB_FILENAME), dense_blob).unwrap();
let err = SegmentReader::open(&seg_dir, 1, Some(&dense_config))
.err()
.unwrap();
assert!(err.to_string().contains("trailing or unreferenced bytes"));
}
#[test]
fn test_open_exposes_dense_hnsw_header_for_dense_segments() {
let dir = tempfile::tempdir().unwrap();
let seg_dir = dir.path().join("seg_0001");
let mt = Memtable::new();
let dense_config = dense_config(3);
let mut first = make_node(1, 1, "a");
first.dense_vector = Some(vec![0.1, 0.2, 0.3]);
mt.apply_op(&WalOp::UpsertNode(first), 0);
let mut second = make_node(2, 1, "b");
second.dense_vector = Some(vec![0.3, 0.2, 0.1]);
mt.apply_op(&WalOp::UpsertNode(second), 0);
write_segment(&seg_dir, 1, &mt, Some(&dense_config)).unwrap();
let reader = SegmentReader::open(&seg_dir, 1, Some(&dense_config)).unwrap();
let header = reader.dense_hnsw_header().unwrap();
assert_eq!(header.point_count, 2);
assert_eq!(header.metric, DenseMetric::Cosine);
assert_eq!(header.dimension, 3);
assert_eq!(header.m, dense_config.hnsw.m);
assert_eq!(
reader.raw_dense_hnsw_meta_mmap(),
&std::fs::read(seg_dir.join(crate::dense_hnsw::DENSE_HNSW_META_FILENAME)).unwrap()
);
assert_eq!(
reader.raw_dense_hnsw_graph_mmap(),
&std::fs::read(seg_dir.join(crate::dense_hnsw::DENSE_HNSW_GRAPH_FILENAME)).unwrap()
);
}
#[test]
fn test_open_rejects_missing_dense_hnsw_files_for_dense_segments() {
let dir = tempfile::tempdir().unwrap();
let seg_dir = dir.path().join("seg_0001");
let mt = Memtable::new();
let dense_config = dense_config(3);
let mut first = make_node(1, 1, "a");
first.dense_vector = Some(vec![0.1, 0.2, 0.3]);
mt.apply_op(&WalOp::UpsertNode(first), 0);
write_segment(&seg_dir, 1, &mt, Some(&dense_config)).unwrap();
std::fs::remove_file(seg_dir.join(crate::dense_hnsw::DENSE_HNSW_META_FILENAME)).unwrap();
std::fs::remove_file(seg_dir.join(crate::dense_hnsw::DENSE_HNSW_GRAPH_FILENAME)).unwrap();
let err = SegmentReader::open(&seg_dir, 1, Some(&dense_config))
.err()
.unwrap();
assert!(
err.to_string()
.contains("dense HNSW files are missing for 1 dense vectors"),
"got: {}",
err
);
}
#[test]
fn test_open_keeps_dense_hnsw_empty_for_vectorless_segments() {
let mt = Memtable::new();
mt.apply_op(&WalOp::UpsertNode(make_node(1, 1, "plain")), 0);
let (_dir, reader) = write_and_open(&mt);
assert!(reader.dense_hnsw_header().is_none());
assert!(reader.raw_dense_hnsw_meta_mmap().is_empty());
assert!(reader.raw_dense_hnsw_graph_mmap().is_empty());
}
#[test]
fn test_open_rejects_dense_hnsw_files_in_v6_segment() {
let dir = tempfile::tempdir().unwrap();
let seg_dir = dir.path().join("seg_0001");
let mt = Memtable::new();
let dense_config = dense_config(2);
let mut node = make_node(1, 1, "vector");
node.dense_vector = Some(vec![0.1, 0.2]);
mt.apply_op(&WalOp::UpsertNode(node), 0);
write_segment(&seg_dir, 1, &mt, Some(&dense_config)).unwrap();
let mut format_ver = std::fs::read(seg_dir.join("format.ver")).unwrap();
format_ver[4..8].copy_from_slice(&6u32.to_le_bytes());
std::fs::write(seg_dir.join("format.ver"), format_ver).unwrap();
let err = SegmentReader::open(&seg_dir, 1, Some(&dense_config))
.err()
.unwrap();
assert!(
err.to_string()
.contains("unexpected dense HNSW files for format version 6"),
"got: {}",
err
);
}
#[test]
fn test_open_rejects_dense_hnsw_metric_mismatch() {
let dir = tempfile::tempdir().unwrap();
let seg_dir = dir.path().join("seg_0001");
let mt = Memtable::new();
let dense_config = dense_config(2);
let mut node = make_node(1, 1, "vector");
node.dense_vector = Some(vec![0.1, 0.2]);
mt.apply_op(&WalOp::UpsertNode(node), 0);
write_segment(&seg_dir, 1, &mt, Some(&dense_config)).unwrap();
let mut meta =
std::fs::read(seg_dir.join(crate::dense_hnsw::DENSE_HNSW_META_FILENAME)).unwrap();
meta[26] = 1; std::fs::write(
seg_dir.join(crate::dense_hnsw::DENSE_HNSW_META_FILENAME),
meta,
)
.unwrap();
let err = SegmentReader::open(&seg_dir, 1, Some(&dense_config))
.err()
.unwrap();
assert!(
err.to_string().contains("does not match configured metric"),
"got: {}",
err
);
}
#[test]
fn test_open_rejects_missing_sparse_posting_files_for_sparse_segments() {
let dir = tempfile::tempdir().unwrap();
let seg_dir = dir.path().join("seg_0001");
let mt = Memtable::new();
let mut node = make_node(1, 1, "sparse");
node.sparse_vector = Some(vec![(2, 1.5), (7, 0.25)]);
mt.apply_op(&WalOp::UpsertNode(node), 0);
write_segment(&seg_dir, 1, &mt, None).unwrap();
std::fs::remove_file(seg_dir.join(crate::sparse_postings::SPARSE_POSTING_INDEX_FILENAME))
.unwrap();
std::fs::remove_file(seg_dir.join(crate::sparse_postings::SPARSE_POSTINGS_FILENAME))
.unwrap();
let err = SegmentReader::open(&seg_dir, 1, None).err().unwrap();
assert!(
err.to_string()
.contains("segment has sparse vectors but sparse posting files are missing"),
"got: {}",
err
);
}
#[test]
fn test_open_rejects_sparse_posting_files_in_v7_segment() {
let dir = tempfile::tempdir().unwrap();
let seg_dir = dir.path().join("seg_0001");
let mt = Memtable::new();
let mut node = make_node(1, 1, "sparse");
node.sparse_vector = Some(vec![(2, 1.5), (7, 0.25)]);
mt.apply_op(&WalOp::UpsertNode(node), 0);
write_segment(&seg_dir, 1, &mt, None).unwrap();
let mut format_ver = std::fs::read(seg_dir.join("format.ver")).unwrap();
format_ver[4..8].copy_from_slice(&7u32.to_le_bytes());
std::fs::write(seg_dir.join("format.ver"), format_ver).unwrap();
let err = SegmentReader::open(&seg_dir, 1, None).err().unwrap();
assert!(
err.to_string()
.contains("unexpected sparse posting files for format version 7"),
"got: {}",
err
);
}
#[test]
fn test_open_rejects_sparse_vectors_in_v7_segment_without_postings() {
let dir = tempfile::tempdir().unwrap();
let seg_dir = dir.path().join("seg_0001");
let mt = Memtable::new();
let mut node = make_node(1, 1, "sparse");
node.sparse_vector = Some(vec![(2, 1.5), (7, 0.25)]);
mt.apply_op(&WalOp::UpsertNode(node), 0);
write_segment(&seg_dir, 1, &mt, None).unwrap();
std::fs::remove_file(seg_dir.join(crate::sparse_postings::SPARSE_POSTING_INDEX_FILENAME))
.unwrap();
std::fs::remove_file(seg_dir.join(crate::sparse_postings::SPARSE_POSTINGS_FILENAME))
.unwrap();
let mut format_ver = std::fs::read(seg_dir.join("format.ver")).unwrap();
format_ver[4..8].copy_from_slice(&7u32.to_le_bytes());
std::fs::write(seg_dir.join("format.ver"), format_ver).unwrap();
let err = SegmentReader::open(&seg_dir, 1, None).err().unwrap();
assert!(
err.to_string().contains("predates sparse posting support"),
"got: {}",
err
);
}
#[test]
fn test_open_rejects_sparse_posting_parity_mismatch() {
let dir = tempfile::tempdir().unwrap();
let seg_dir = dir.path().join("seg_0001");
let mt = Memtable::new();
let mut node = make_node(1, 1, "sparse");
node.sparse_vector = Some(vec![(2, 1.5), (7, 0.25)]);
mt.apply_op(&WalOp::UpsertNode(node), 0);
write_segment(&seg_dir, 1, &mt, None).unwrap();
let mut postings =
std::fs::read(seg_dir.join(crate::sparse_postings::SPARSE_POSTINGS_FILENAME)).unwrap();
postings[8..12].copy_from_slice(&9.0f32.to_le_bytes());
std::fs::write(
seg_dir.join(crate::sparse_postings::SPARSE_POSTINGS_FILENAME),
postings,
)
.unwrap();
let err = SegmentReader::open(&seg_dir, 1, None).err().unwrap();
assert!(
err.to_string()
.contains("does not match sparse vector payloads"),
"got: {}",
err
);
}
#[test]
fn test_open_rejects_sparse_vector_blob_with_negative_weight() {
let mut node = make_node(1, 1, "sparse-negative");
node.sparse_vector = Some(vec![(2, 1.5), (7, 0.25)]);
let (_dir, seg_dir) = write_sparse_segment(vec![node]);
let sparse_blob_path =
seg_dir.join(crate::segment_writer::NODE_SPARSE_VECTOR_BLOB_FILENAME);
let mut sparse_blob = std::fs::read(&sparse_blob_path).unwrap();
sparse_blob[4..8].copy_from_slice(&(-1.5f32).to_le_bytes());
std::fs::write(&sparse_blob_path, sparse_blob).unwrap();
let err = SegmentReader::open(&seg_dir, 1, None).err().unwrap();
assert!(
err.to_string().contains("has negative weight"),
"got: {}",
err
);
}
#[test]
fn test_open_rejects_sparse_postings_missing_expected_dimension() {
let mut node = make_node(1, 1, "sparse-missing-dim");
node.sparse_vector = Some(vec![(2, 1.5), (7, 0.25)]);
let (_dir, seg_dir) = write_sparse_segment(vec![node]);
let index_path = seg_dir.join(crate::sparse_postings::SPARSE_POSTING_INDEX_FILENAME);
let mut index = std::fs::read(&index_path).unwrap();
index[24..28].copy_from_slice(&9u32.to_le_bytes());
std::fs::write(&index_path, index).unwrap();
let err = SegmentReader::open(&seg_dir, 1, None).err().unwrap();
assert!(
err.to_string()
.contains("sparse posting files are missing dimension 7 from sparse vectors"),
"got: {}",
err
);
}
#[test]
fn test_open_rejects_sparse_posting_count_mismatch() {
let mut first = make_node(1, 1, "sparse-count-a");
first.sparse_vector = Some(vec![(2, 1.5), (7, 0.25)]);
let mut second = make_node(2, 1, "sparse-count-b");
second.sparse_vector = Some(vec![(2, 0.5)]);
let (_dir, seg_dir) = write_sparse_segment(vec![first, second]);
let index_path = seg_dir.join(crate::sparse_postings::SPARSE_POSTING_INDEX_FILENAME);
let postings_path = seg_dir.join(crate::sparse_postings::SPARSE_POSTINGS_FILENAME);
let mut index = std::fs::read(&index_path).unwrap();
index[20..24].copy_from_slice(&1u32.to_le_bytes());
index[28..36].copy_from_slice(&12u64.to_le_bytes());
std::fs::write(&index_path, index).unwrap();
let postings = std::fs::read(&postings_path).unwrap();
let mut rebuilt_postings = Vec::with_capacity(24);
rebuilt_postings.extend_from_slice(&postings[0..12]);
rebuilt_postings.extend_from_slice(&postings[24..36]);
std::fs::write(&postings_path, rebuilt_postings).unwrap();
let err = SegmentReader::open(&seg_dir, 1, None).err().unwrap();
assert!(
err.to_string().contains(
"sparse posting dimension 2 count 1 does not match sparse vector payload count 2"
),
"got: {}",
err
);
}
#[test]
fn test_open_rejects_dense_hnsw_hnsw_param_mismatch() {
let dir = tempfile::tempdir().unwrap();
let seg_dir = dir.path().join("seg_0001");
let mt = Memtable::new();
let dense_config = dense_config(2);
let mut node = make_node(1, 1, "vector");
node.dense_vector = Some(vec![0.1, 0.2]);
mt.apply_op(&WalOp::UpsertNode(node), 0);
write_segment(&seg_dir, 1, &mt, Some(&dense_config)).unwrap();
let mut meta =
std::fs::read(seg_dir.join(crate::dense_hnsw::DENSE_HNSW_META_FILENAME)).unwrap();
meta[22..24].copy_from_slice(&(dense_config.hnsw.m + 1).to_le_bytes());
std::fs::write(
seg_dir.join(crate::dense_hnsw::DENSE_HNSW_META_FILENAME),
meta,
)
.unwrap();
let err = SegmentReader::open(&seg_dir, 1, Some(&dense_config))
.err()
.unwrap();
assert!(
err.to_string().contains("does not match configured m"),
"got: {}",
err
);
}
#[test]
fn test_open_rejects_dense_hnsw_dimension_mismatch() {
let dir = tempfile::tempdir().unwrap();
let seg_dir = dir.path().join("seg_0001");
let mt = Memtable::new();
let dense_config = dense_config(2);
let mut node = make_node(1, 1, "vector");
node.dense_vector = Some(vec![0.1, 0.2]);
mt.apply_op(&WalOp::UpsertNode(node), 0);
write_segment(&seg_dir, 1, &mt, Some(&dense_config)).unwrap();
let mut meta =
std::fs::read(seg_dir.join(crate::dense_hnsw::DENSE_HNSW_META_FILENAME)).unwrap();
meta[28..32].copy_from_slice(&3u32.to_le_bytes());
std::fs::write(
seg_dir.join(crate::dense_hnsw::DENSE_HNSW_META_FILENAME),
meta,
)
.unwrap();
let err = SegmentReader::open(&seg_dir, 1, Some(&dense_config))
.err()
.unwrap();
assert!(
err.to_string()
.contains("does not match configured dimension"),
"got: {}",
err
);
}
#[test]
fn test_open_rejects_dense_hnsw_ef_construction_mismatch() {
let dir = tempfile::tempdir().unwrap();
let seg_dir = dir.path().join("seg_0001");
let mt = Memtable::new();
let dense_config = dense_config(2);
let mut node = make_node(1, 1, "vector");
node.dense_vector = Some(vec![0.1, 0.2]);
mt.apply_op(&WalOp::UpsertNode(node), 0);
write_segment(&seg_dir, 1, &mt, Some(&dense_config)).unwrap();
let mut meta =
std::fs::read(seg_dir.join(crate::dense_hnsw::DENSE_HNSW_META_FILENAME)).unwrap();
meta[24..26].copy_from_slice(&(dense_config.hnsw.ef_construction + 1).to_le_bytes());
std::fs::write(
seg_dir.join(crate::dense_hnsw::DENSE_HNSW_META_FILENAME),
meta,
)
.unwrap();
let err = SegmentReader::open(&seg_dir, 1, Some(&dense_config))
.err()
.unwrap();
assert!(
err.to_string()
.contains("does not match configured ef_construction"),
"got: {}",
err
);
}
#[test]
fn test_open_rejects_dense_hnsw_without_dense_config() {
let dir = tempfile::tempdir().unwrap();
let seg_dir = dir.path().join("seg_0001");
let mt = Memtable::new();
let dense_config = dense_config(2);
let mut node = make_node(1, 1, "vector");
node.dense_vector = Some(vec![0.1, 0.2]);
mt.apply_op(&WalOp::UpsertNode(node), 0);
write_segment(&seg_dir, 1, &mt, Some(&dense_config)).unwrap();
let err = SegmentReader::open(&seg_dir, 1, None).err().unwrap();
assert!(
err.to_string()
.contains("require DbOptions::dense_vector to be configured"),
"got: {}",
err
);
}
#[test]
fn test_sidecar_node_meta_roundtrip() {
let mt = Memtable::new();
mt.apply_op(&WalOp::UpsertNode(make_node_with_props(1, 1, "alice")), 0);
mt.apply_op(&WalOp::UpsertNode(make_node(2, 2, "bob")), 0);
let (_dir, reader) = write_and_open(&mt);
assert_eq!(reader.node_meta_count(), 2);
let (nid, _off, _len, tid, updated_at, weight, key_len, _pho, phc, _lws) =
reader.node_meta_at(0).unwrap();
assert_eq!(nid, 1);
assert_eq!(tid, 1);
assert_eq!(updated_at, 2000);
assert!((weight - 0.75).abs() < f32::EPSILON);
assert_eq!(key_len, 5); assert_eq!(phc, 0);
let (nid2, _, _, tid2, _, _, key_len2, _, phc2, _) = reader.node_meta_at(1).unwrap();
assert_eq!(nid2, 2);
assert_eq!(tid2, 2);
assert_eq!(key_len2, 3); assert_eq!(phc2, 0); }
#[test]
fn test_sidecar_edge_meta_roundtrip() {
let mt = Memtable::new();
mt.apply_op(&WalOp::UpsertNode(make_node(1, 1, "a")), 0);
mt.apply_op(&WalOp::UpsertNode(make_node(2, 1, "b")), 0);
mt.apply_op(&WalOp::UpsertEdge(make_edge(10, 1, 2, 5)), 0);
let (_dir, reader) = write_and_open(&mt);
assert_eq!(reader.edge_meta_count(), 1);
let (eid, _off, _len, from, to, tid, updated_at, weight, vf, vt, _lws) =
reader.edge_meta_at(0).unwrap();
assert_eq!(eid, 10);
assert_eq!(from, 1);
assert_eq!(to, 2);
assert_eq!(tid, 5);
assert_eq!(updated_at, 2001);
assert!((weight - 1.0).abs() < f32::EPSILON);
assert_eq!(vf, 0);
assert_eq!(vt, i64::MAX);
}
#[test]
fn test_sidecar_data_offset_matches_nodes_dat() {
let mt = Memtable::new();
mt.apply_op(&WalOp::UpsertNode(make_node(1, 1, "a")), 0);
mt.apply_op(&WalOp::UpsertNode(make_node(2, 2, "bb")), 0);
let (_dir, reader) = write_and_open(&mt);
for i in 0..reader.node_meta_count() as usize {
let (nid, data_offset, data_len, tid, _, _, _, _, _, _) =
reader.node_meta_at(i).unwrap();
let node = decode_node_at(&reader.nodes_mmap, data_offset as usize, nid).unwrap();
assert_eq!(node.id, nid);
assert_eq!(node.type_id, tid);
assert!(data_len > 0);
}
}
}