use crate::error::EngineError;
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::hash::{BuildHasherDefault, Hasher};
#[doc(hidden)]
#[derive(Default)]
pub struct NodeIdHasher(u64);
impl Hasher for NodeIdHasher {
#[inline]
fn finish(&self) -> u64 {
self.0
}
fn write(&mut self, bytes: &[u8]) {
let mut value = 0u64;
for (index, byte) in bytes.iter().take(8).enumerate() {
value |= (*byte as u64) << (index * 8);
}
self.0 = value;
}
#[inline]
fn write_u64(&mut self, i: u64) {
self.0 = i;
}
#[inline]
fn write_u32(&mut self, i: u32) {
self.0 = i as u64;
}
#[inline]
fn write_usize(&mut self, i: usize) {
self.0 = i as u64;
}
}
#[doc(hidden)]
pub type NodeIdBuildHasher = BuildHasherDefault<NodeIdHasher>;
pub type NodeIdMap<V> = HashMap<u64, V, NodeIdBuildHasher>;
pub type NodeIdSet = HashSet<u64, NodeIdBuildHasher>;
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum PropValue {
Null,
Bool(bool),
Int(i64),
UInt(u64),
Float(f64),
String(String),
Bytes(Vec<u8>),
Array(Vec<PropValue>),
Map(BTreeMap<String, PropValue>),
}
fn fnv1a(bytes: &[u8]) -> u64 {
let mut hash: u64 = 0xcbf29ce484222325;
for &b in bytes {
hash ^= b as u64;
hash = hash.wrapping_mul(0x100000001b3);
}
hash
}
pub fn hash_prop_key(key: &str) -> u64 {
fnv1a(key.as_bytes())
}
pub fn hash_prop_value(value: &PropValue) -> u64 {
let bytes = rmp_serde::to_vec(value).expect("PropValue must be serializable");
fnv1a(&bytes)
}
pub type DenseVector = Vec<f32>;
pub type SparseVector = Vec<(u32, f32)>;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum DenseMetric {
Cosine,
Euclidean,
DotProduct,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct HnswConfig {
pub m: u16,
pub ef_construction: u16,
}
pub const DEFAULT_DENSE_EF_SEARCH: usize = 128;
impl Default for HnswConfig {
fn default() -> Self {
Self {
m: 16,
ef_construction: 200,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct DenseVectorConfig {
pub dimension: u32,
pub metric: DenseMetric,
#[serde(default)]
pub hnsw: HnswConfig,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum VectorSearchMode {
Dense,
Sparse,
Hybrid,
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
pub enum FusionMode {
#[default]
WeightedRankFusion,
ReciprocalRankFusion,
WeightedScoreFusion,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct VectorSearchScope {
pub start_node_id: u64,
pub max_depth: u32,
pub direction: Direction,
pub edge_type_filter: Option<Vec<u32>>,
pub at_epoch: Option<i64>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct VectorSearchRequest {
pub mode: VectorSearchMode,
pub dense_query: Option<DenseVector>,
pub sparse_query: Option<SparseVector>,
pub k: usize,
pub type_filter: Option<Vec<u32>>,
pub ef_search: Option<usize>,
pub scope: Option<VectorSearchScope>,
pub dense_weight: Option<f32>,
pub sparse_weight: Option<f32>,
pub fusion_mode: Option<FusionMode>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct VectorHit {
pub node_id: u64,
pub score: f32,
}
pub fn validate_dense_vector_config(config: &DenseVectorConfig) -> Result<(), EngineError> {
if config.dimension == 0 {
return Err(EngineError::InvalidOperation(
"dense vector dimension must be > 0".into(),
));
}
if config.hnsw.m == 0 {
return Err(EngineError::InvalidOperation(
"dense HNSW m must be > 0".into(),
));
}
if config.hnsw.ef_construction == 0 {
return Err(EngineError::InvalidOperation(
"dense HNSW ef_construction must be > 0".into(),
));
}
if config.hnsw.ef_construction < config.hnsw.m {
return Err(EngineError::InvalidOperation(format!(
"dense HNSW ef_construction ({}) must be >= m ({})",
config.hnsw.ef_construction, config.hnsw.m
)));
}
Ok(())
}
fn validate_finite_vector_component(value: f32, context: &str) -> Result<(), EngineError> {
if !value.is_finite() {
return Err(EngineError::InvalidOperation(format!(
"{} contains NaN or infinite value",
context
)));
}
Ok(())
}
pub fn validate_dense_vector(
values: &[f32],
config: &DenseVectorConfig,
) -> Result<(), EngineError> {
validate_dense_vector_config(config)?;
if values.len() != config.dimension as usize {
return Err(EngineError::InvalidOperation(format!(
"dense vector length {} does not match configured dimension {}",
values.len(),
config.dimension
)));
}
for &value in values {
validate_finite_vector_component(value, "dense vector")?;
}
Ok(())
}
fn canonicalize_sparse_vector_entries(
mut entries: SparseVector,
) -> Result<Option<SparseVector>, EngineError> {
if entries.is_empty() {
return Ok(None);
}
for &(_, weight) in &entries {
validate_finite_vector_component(weight, "sparse vector")?;
if weight < 0.0 {
return Err(EngineError::InvalidOperation(
"sparse vector weights must be non-negative".into(),
));
}
}
entries.sort_unstable_by_key(|&(dimension_id, _)| dimension_id);
let mut canonical = Vec::with_capacity(entries.len());
for (dimension_id, weight) in entries {
if let Some((last_dimension_id, last_weight)) = canonical.last_mut() {
if *last_dimension_id == dimension_id {
*last_weight += weight;
continue;
}
}
canonical.push((dimension_id, weight));
}
canonical.retain(|&(_, weight)| weight != 0.0);
if canonical.is_empty() {
return Ok(None);
}
for &(_, weight) in &canonical {
validate_finite_vector_component(weight, "sparse vector")?;
}
Ok(Some(canonical))
}
pub fn canonicalize_sparse_vector(
values: &[(u32, f32)],
) -> Result<Option<SparseVector>, EngineError> {
canonicalize_sparse_vector_entries(values.to_vec())
}
pub fn canonicalize_sparse_vector_owned(
values: SparseVector,
) -> Result<Option<SparseVector>, EngineError> {
canonicalize_sparse_vector_entries(values)
}
#[derive(Debug, Clone, Copy)]
pub struct TombstoneEntry {
pub deleted_at: i64,
pub last_write_seq: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeRecord {
pub id: u64,
pub type_id: u32,
pub key: String,
pub props: BTreeMap<String, PropValue>,
pub created_at: i64,
pub updated_at: i64,
pub weight: f32,
#[serde(default)]
pub dense_vector: Option<DenseVector>,
#[serde(default)]
pub sparse_vector: Option<SparseVector>,
#[serde(default)]
pub last_write_seq: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EdgeRecord {
pub id: u64,
pub from: u64,
pub to: u64,
pub type_id: u32,
pub props: BTreeMap<String, PropValue>,
pub created_at: i64,
pub updated_at: i64,
pub weight: f32,
pub valid_from: i64,
pub valid_to: i64,
#[serde(default)]
pub last_write_seq: u64,
}
#[derive(Debug, Clone, Default, PartialEq)]
pub struct PageRequest {
pub limit: Option<usize>,
pub after: Option<u64>,
}
#[derive(Debug, Clone)]
pub struct PageResult<T> {
pub items: Vec<T>,
pub next_cursor: Option<u64>,
}
#[derive(Debug, Clone, PartialEq)]
pub struct NodeQuery {
pub type_id: Option<u32>,
pub ids: Vec<u64>,
pub keys: Vec<String>,
pub filter: Option<NodeFilterExpr>,
pub page: PageRequest,
pub order: NodeQueryOrder,
pub allow_full_scan: bool,
}
impl Default for NodeQuery {
fn default() -> Self {
Self {
type_id: None,
ids: Vec::new(),
keys: Vec::new(),
filter: None,
page: PageRequest::default(),
order: NodeQueryOrder::NodeIdAsc,
allow_full_scan: false,
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum NodeFilterExpr {
PropertyEquals {
key: String,
value: PropValue,
},
PropertyIn {
key: String,
values: Vec<PropValue>,
},
PropertyRange {
key: String,
lower: Option<PropertyRangeBound>,
upper: Option<PropertyRangeBound>,
},
PropertyExists {
key: String,
},
PropertyMissing {
key: String,
},
UpdatedAtRange {
lower_ms: Option<i64>,
upper_ms: Option<i64>,
},
And(Vec<NodeFilterExpr>),
Or(Vec<NodeFilterExpr>),
Not(Box<NodeFilterExpr>),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum NodeQueryOrder {
NodeIdAsc,
}
#[derive(Debug, Clone, PartialEq)]
pub struct QueryNodeIdsResult {
pub items: Vec<u64>,
pub next_cursor: Option<u64>,
}
#[derive(Debug, Clone)]
pub struct QueryNodesResult {
pub items: Vec<NodeRecord>,
pub next_cursor: Option<u64>,
}
#[derive(Debug, Clone, PartialEq)]
pub struct GraphPatternQuery {
pub nodes: Vec<NodePattern>,
pub edges: Vec<EdgePattern>,
pub at_epoch: Option<i64>,
pub limit: usize,
pub order: PatternOrder,
}
#[derive(Debug, Clone, PartialEq)]
pub struct NodePattern {
pub alias: String,
pub type_id: Option<u32>,
pub ids: Vec<u64>,
pub keys: Vec<String>,
pub filter: Option<NodeFilterExpr>,
}
#[derive(Debug, Clone, PartialEq)]
pub struct EdgePattern {
pub alias: Option<String>,
pub from_alias: String,
pub to_alias: String,
pub direction: Direction,
pub type_filter: Option<Vec<u32>>,
pub property_predicates: Vec<EdgePostFilterPredicate>,
}
#[derive(Debug, Clone, PartialEq)]
pub enum EdgePostFilterPredicate {
PropertyEquals {
key: String,
value: PropValue,
},
PropertyRange {
key: String,
lower: Option<PropertyRangeBound>,
upper: Option<PropertyRangeBound>,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PatternOrder {
AnchorThenAliasesAsc,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct QueryPatternResult {
pub matches: Vec<QueryMatch>,
pub truncated: bool,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct QueryMatch {
pub nodes: BTreeMap<String, u64>,
pub edges: BTreeMap<String, u64>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum QueryPlanKind {
NodeQuery,
PatternQuery,
}
#[derive(Debug, Clone, PartialEq)]
pub struct QueryPlan {
pub kind: QueryPlanKind,
pub root: QueryPlanNode,
pub estimated_candidates: Option<u64>,
pub warnings: Vec<QueryPlanWarning>,
}
#[derive(Debug, Clone, PartialEq)]
pub enum QueryPlanNode {
ExplicitIds,
KeyLookup,
NodeTypeIndex,
PropertyEqualityIndex,
PropertyRangeIndex,
TimestampIndex,
AdjacencyExpansion,
Intersect {
inputs: Vec<QueryPlanNode>,
},
Union {
inputs: Vec<QueryPlanNode>,
},
VerifyNodeFilter {
input: Box<QueryPlanNode>,
},
VerifyEdgePredicates {
input: Box<QueryPlanNode>,
},
PatternExpand {
anchor_alias: String,
input: Box<QueryPlanNode>,
},
FallbackTypeScan,
FallbackFullNodeScan,
EmptyResult,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum QueryPlanWarning {
MissingReadyIndex,
UsingFallbackScan,
FullScanRequiresOptIn,
FullScanExplicitlyAllowed,
UnboundedPatternRejected,
EdgePropertyPostFilter,
IndexSkippedAsBroad,
CandidateCapExceeded,
RangeCandidateCapExceeded,
TimestampCandidateCapExceeded,
VerifyOnlyFilter,
BooleanBranchFallback,
PlanningProbeBudgetExceeded,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum SecondaryIndexRangeDomain {
Int,
UInt,
Float,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum SecondaryIndexKind {
Equality,
Range { domain: SecondaryIndexRangeDomain },
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum SecondaryIndexTarget {
NodeProperty { type_id: u32, prop_key: String },
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum SecondaryIndexState {
Building,
Ready,
Failed,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct SecondaryIndexManifestEntry {
pub index_id: u64,
pub target: SecondaryIndexTarget,
pub kind: SecondaryIndexKind,
pub state: SecondaryIndexState,
#[serde(default)]
pub last_error: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NodePropertyIndexInfo {
pub index_id: u64,
pub type_id: u32,
pub prop_key: String,
pub kind: SecondaryIndexKind,
pub state: SecondaryIndexState,
pub last_error: Option<String>,
}
#[derive(Debug, Clone, PartialEq)]
pub enum PropertyRangeBound {
Included(PropValue),
Excluded(PropValue),
}
impl PropertyRangeBound {
pub fn value(&self) -> &PropValue {
match self {
PropertyRangeBound::Included(value) | PropertyRangeBound::Excluded(value) => value,
}
}
pub fn is_inclusive(&self) -> bool {
matches!(self, PropertyRangeBound::Included(_))
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct PropertyRangeCursor {
pub value: PropValue,
pub node_id: u64,
}
#[derive(Debug, Clone, Default, PartialEq)]
pub struct PropertyRangePageRequest {
pub limit: Option<usize>,
pub after: Option<PropertyRangeCursor>,
}
#[derive(Debug, Clone, PartialEq)]
pub struct PropertyRangePageResult<T> {
pub items: Vec<T>,
pub next_cursor: Option<PropertyRangeCursor>,
}
#[derive(Debug, Clone)]
pub(crate) enum WalOp {
UpsertNode(NodeRecord),
UpsertEdge(EdgeRecord),
DeleteNode { id: u64, deleted_at: i64 },
DeleteEdge { id: u64, deleted_at: i64 },
}
#[repr(u8)]
#[derive(Debug, Clone, Copy, PartialEq)]
pub(crate) enum OpTag {
UpsertNode = 1,
UpsertEdge = 2,
DeleteNode = 3,
DeleteEdge = 4,
}
impl OpTag {
pub(crate) fn from_u8(v: u8) -> Option<OpTag> {
match v {
1 => Some(OpTag::UpsertNode),
2 => Some(OpTag::UpsertEdge),
3 => Some(OpTag::DeleteNode),
4 => Some(OpTag::DeleteEdge),
_ => None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SegmentInfo {
pub id: u64,
pub node_count: u64,
pub edge_count: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ManifestState {
pub version: u32,
pub segments: Vec<SegmentInfo>,
pub next_node_id: u64,
pub next_edge_id: u64,
#[serde(default)]
pub dense_vector: Option<DenseVectorConfig>,
#[serde(default)]
pub prune_policies: BTreeMap<String, PrunePolicy>,
#[serde(default)]
pub next_engine_seq: u64,
#[serde(default)]
pub next_wal_generation_id: u64,
#[serde(default)]
pub active_wal_generation_id: u64,
#[serde(default)]
pub pending_flush_epochs: Vec<FlushEpochMeta>,
#[serde(default)]
pub secondary_indexes: Vec<SecondaryIndexManifestEntry>,
#[serde(default)]
pub next_secondary_index_id: u64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum FlushEpochState {
FrozenPendingFlush,
PublishedPendingRetire,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FlushEpochMeta {
pub epoch_id: u64,
pub wal_generation_id: u64,
pub state: FlushEpochState,
pub segment_id: Option<u64>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CompactionPhase {
CollectingTombstones,
MergingNodes,
MergingEdges,
WritingOutput,
}
#[derive(Debug, Clone)]
pub struct CompactionProgress {
pub phase: CompactionPhase,
pub segments_processed: usize,
pub total_segments: usize,
pub records_processed: u64,
pub total_records: u64,
}
#[derive(Debug, Clone)]
pub struct CompactionStats {
pub segments_merged: usize,
pub nodes_kept: u64,
pub nodes_removed: u64,
pub edges_kept: u64,
pub edges_removed: u64,
pub duration_ms: u64,
pub output_segment_id: u64,
pub nodes_auto_pruned: u64,
pub edges_auto_pruned: u64,
}
#[derive(Debug, Clone)]
pub struct NodeInput {
pub type_id: u32,
pub key: String,
pub props: BTreeMap<String, PropValue>,
pub weight: f32,
pub dense_vector: Option<DenseVector>,
pub sparse_vector: Option<SparseVector>,
}
#[derive(Debug, Clone, PartialEq)]
pub struct UpsertNodeOptions {
pub props: BTreeMap<String, PropValue>,
pub weight: f32,
pub dense_vector: Option<DenseVector>,
pub sparse_vector: Option<SparseVector>,
}
impl Default for UpsertNodeOptions {
fn default() -> Self {
Self {
props: BTreeMap::new(),
weight: 1.0,
dense_vector: None,
sparse_vector: None,
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct UpsertEdgeOptions {
pub props: BTreeMap<String, PropValue>,
pub weight: f32,
pub valid_from: Option<i64>,
pub valid_to: Option<i64>,
}
impl Default for UpsertEdgeOptions {
fn default() -> Self {
Self {
props: BTreeMap::new(),
weight: 1.0,
valid_from: None,
valid_to: None,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum TxnLocalRef {
Slot(u32),
Alias(String),
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum TxnNodeRef {
Id(u64),
Key { type_id: u32, key: String },
Local(TxnLocalRef),
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum TxnEdgeRef {
Id(u64),
Triple {
from: TxnNodeRef,
to: TxnNodeRef,
type_id: u32,
},
Local(TxnLocalRef),
}
#[derive(Debug, Clone, PartialEq)]
pub enum TxnIntent {
UpsertNode {
alias: Option<String>,
type_id: u32,
key: String,
options: UpsertNodeOptions,
},
UpsertEdge {
alias: Option<String>,
from: TxnNodeRef,
to: TxnNodeRef,
type_id: u32,
options: UpsertEdgeOptions,
},
DeleteNode {
target: TxnNodeRef,
},
DeleteEdge {
target: TxnEdgeRef,
},
InvalidateEdge {
target: TxnEdgeRef,
valid_to: i64,
},
}
#[derive(Debug, Clone, PartialEq)]
pub struct TxnNodeView {
pub id: Option<u64>,
pub local: Option<TxnLocalRef>,
pub type_id: u32,
pub key: String,
pub props: BTreeMap<String, PropValue>,
pub created_at: Option<i64>,
pub updated_at: Option<i64>,
pub weight: f32,
pub dense_vector: Option<DenseVector>,
pub sparse_vector: Option<SparseVector>,
}
#[derive(Debug, Clone, PartialEq)]
pub struct TxnEdgeView {
pub id: Option<u64>,
pub local: Option<TxnLocalRef>,
pub from: TxnNodeRef,
pub to: TxnNodeRef,
pub type_id: u32,
pub props: BTreeMap<String, PropValue>,
pub created_at: Option<i64>,
pub updated_at: Option<i64>,
pub weight: f32,
pub valid_from: Option<i64>,
pub valid_to: Option<i64>,
}
#[derive(Debug, Clone, Default, PartialEq)]
pub struct TxnCommitResult {
pub node_ids: Vec<u64>,
pub edge_ids: Vec<u64>,
pub local_node_ids: BTreeMap<TxnLocalRef, u64>,
pub local_edge_ids: BTreeMap<TxnLocalRef, u64>,
}
impl TxnCommitResult {
pub fn node_id(&self, target: &TxnNodeRef) -> Option<u64> {
match target {
TxnNodeRef::Id(id) => Some(*id),
TxnNodeRef::Local(local) => self.local_node_ids.get(local).copied(),
TxnNodeRef::Key { .. } => None,
}
}
pub fn edge_id(&self, target: &TxnEdgeRef) -> Option<u64> {
match target {
TxnEdgeRef::Id(id) => Some(*id),
TxnEdgeRef::Local(local) => self.local_edge_ids.get(local).copied(),
TxnEdgeRef::Triple { .. } => None,
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct NeighborOptions {
pub direction: Direction,
pub type_filter: Option<Vec<u32>>,
pub limit: Option<usize>,
pub at_epoch: Option<i64>,
pub decay_lambda: Option<f32>,
}
impl Default for NeighborOptions {
fn default() -> Self {
Self {
direction: Direction::Outgoing,
type_filter: None,
limit: None,
at_epoch: None,
decay_lambda: None,
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct DegreeOptions {
pub direction: Direction,
pub type_filter: Option<Vec<u32>>,
pub at_epoch: Option<i64>,
}
impl Default for DegreeOptions {
fn default() -> Self {
Self {
direction: Direction::Outgoing,
type_filter: None,
at_epoch: None,
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct TopKOptions {
pub direction: Direction,
pub type_filter: Option<Vec<u32>>,
pub scoring: ScoringMode,
pub at_epoch: Option<i64>,
}
impl Default for TopKOptions {
fn default() -> Self {
Self {
direction: Direction::Outgoing,
type_filter: None,
scoring: ScoringMode::Weight,
at_epoch: None,
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct TraverseOptions {
pub min_depth: u32,
pub direction: Direction,
pub edge_type_filter: Option<Vec<u32>>,
pub node_type_filter: Option<Vec<u32>>,
pub at_epoch: Option<i64>,
pub decay_lambda: Option<f64>,
pub limit: Option<usize>,
pub cursor: Option<TraversalCursor>,
}
impl Default for TraverseOptions {
fn default() -> Self {
Self {
min_depth: 1,
direction: Direction::Outgoing,
edge_type_filter: None,
node_type_filter: None,
at_epoch: None,
decay_lambda: None,
limit: None,
cursor: None,
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct SubgraphOptions {
pub direction: Direction,
pub edge_type_filter: Option<Vec<u32>>,
pub at_epoch: Option<i64>,
}
impl Default for SubgraphOptions {
fn default() -> Self {
Self {
direction: Direction::Outgoing,
edge_type_filter: None,
at_epoch: None,
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct ShortestPathOptions {
pub direction: Direction,
pub type_filter: Option<Vec<u32>>,
pub weight_field: Option<String>,
pub at_epoch: Option<i64>,
pub max_depth: Option<u32>,
pub max_cost: Option<f64>,
}
impl Default for ShortestPathOptions {
fn default() -> Self {
Self {
direction: Direction::Outgoing,
type_filter: None,
weight_field: None,
at_epoch: None,
max_depth: None,
max_cost: None,
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct AllShortestPathsOptions {
pub direction: Direction,
pub type_filter: Option<Vec<u32>>,
pub weight_field: Option<String>,
pub at_epoch: Option<i64>,
pub max_depth: Option<u32>,
pub max_cost: Option<f64>,
pub max_paths: Option<usize>,
}
impl Default for AllShortestPathsOptions {
fn default() -> Self {
Self {
direction: Direction::Outgoing,
type_filter: None,
weight_field: None,
at_epoch: None,
max_depth: None,
max_cost: None,
max_paths: None,
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct IsConnectedOptions {
pub direction: Direction,
pub type_filter: Option<Vec<u32>>,
pub at_epoch: Option<i64>,
pub max_depth: Option<u32>,
}
impl Default for IsConnectedOptions {
fn default() -> Self {
Self {
direction: Direction::Outgoing,
type_filter: None,
at_epoch: None,
max_depth: None,
}
}
}
#[derive(Debug, Clone, PartialEq, Default)]
pub struct ComponentOptions {
pub edge_type_filter: Option<Vec<u32>>,
pub node_type_filter: Option<Vec<u32>>,
pub at_epoch: Option<i64>,
}
#[derive(Debug, Clone)]
pub struct EdgeInput {
pub from: u64,
pub to: u64,
pub type_id: u32,
pub props: BTreeMap<String, PropValue>,
pub weight: f32,
pub valid_from: Option<i64>,
pub valid_to: Option<i64>,
}
#[derive(Debug, Clone, Default)]
pub struct GraphPatch {
pub upsert_nodes: Vec<NodeInput>,
pub upsert_edges: Vec<EdgeInput>,
pub invalidate_edges: Vec<(u64, i64)>,
pub delete_node_ids: Vec<u64>,
pub delete_edge_ids: Vec<u64>,
}
#[derive(Debug, Clone)]
pub struct PatchResult {
pub node_ids: Vec<u64>,
pub edge_ids: Vec<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PrunePolicy {
pub max_age_ms: Option<i64>,
pub max_weight: Option<f32>,
pub type_id: Option<u32>,
}
#[derive(Debug, Clone)]
pub struct PruneResult {
pub nodes_pruned: u64,
pub edges_pruned: u64,
}
#[derive(Debug, Clone)]
pub struct DbStats {
pub pending_wal_bytes: usize,
pub segment_count: usize,
pub node_tombstone_count: usize,
pub edge_tombstone_count: usize,
pub last_compaction_ms: Option<i64>,
pub wal_sync_mode: String,
pub active_memtable_bytes: usize,
pub immutable_memtable_bytes: usize,
pub immutable_memtable_count: usize,
pub pending_flush_count: usize,
pub active_wal_generation_id: u64,
pub oldest_retained_wal_generation_id: u64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum Direction {
Outgoing,
Incoming,
Both,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct NeighborEntry {
pub node_id: u64,
pub edge_id: u64,
pub edge_type_id: u32,
pub weight: f32,
pub valid_from: i64,
pub valid_to: i64,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct TraversalHit {
pub node_id: u64,
pub depth: u32,
pub via_edge_id: Option<u64>,
pub score: Option<f64>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct TraversalCursor {
pub depth: u32,
pub last_node_id: u64,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct TraversalPageResult {
pub items: Vec<TraversalHit>,
pub next_cursor: Option<TraversalCursor>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum ScoringMode {
Weight,
Recency,
DecayAdjusted { lambda: f32 },
}
#[derive(Debug, Clone, PartialEq)]
pub struct ShortestPath {
pub nodes: Vec<u64>,
pub edges: Vec<u64>,
pub total_cost: f64,
}
#[derive(Debug, Clone)]
pub struct Subgraph {
pub nodes: Vec<NodeRecord>,
pub edges: Vec<EdgeRecord>,
}
#[derive(Debug, Clone)]
pub struct PprOptions {
pub algorithm: PprAlgorithm,
pub damping_factor: f64,
pub max_iterations: u32,
pub epsilon: f64,
pub approx_residual_tolerance: f64,
pub edge_type_filter: Option<Vec<u32>>,
pub max_results: Option<usize>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum PprAlgorithm {
ExactPowerIteration,
ApproxForwardPush,
}
impl Default for PprOptions {
fn default() -> Self {
PprOptions {
algorithm: PprAlgorithm::ExactPowerIteration,
damping_factor: 0.85,
max_iterations: 20,
epsilon: 1e-6,
approx_residual_tolerance: 1e-5,
edge_type_filter: None,
max_results: None,
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct PprApproxMeta {
pub residual_tolerance: f64,
pub pushes: u64,
pub max_remaining_residual: f64,
}
#[derive(Debug, Clone)]
pub struct PprResult {
pub scores: Vec<(u64, f64)>,
pub iterations: u32,
pub converged: bool,
pub algorithm: PprAlgorithm,
pub approx: Option<PprApproxMeta>,
}
#[derive(Debug, Clone)]
pub struct ExportOptions {
pub node_type_filter: Option<Vec<u32>>,
pub edge_type_filter: Option<Vec<u32>>,
pub include_weights: bool,
}
impl Default for ExportOptions {
fn default() -> Self {
Self {
node_type_filter: None,
edge_type_filter: None,
include_weights: true,
}
}
}
pub type ExportEdge = (u64, u64, u32, f32);
#[derive(Debug, Clone)]
pub struct AdjacencyExport {
pub node_ids: Vec<u64>,
pub edges: Vec<ExportEdge>,
}
#[derive(Debug, Clone, PartialEq)]
pub enum WalSyncMode {
Immediate,
GroupCommit {
interval_ms: u64,
soft_trigger_bytes: usize,
hard_cap_bytes: usize,
},
}
impl Default for WalSyncMode {
fn default() -> Self {
WalSyncMode::GroupCommit {
interval_ms: 50,
soft_trigger_bytes: 2 * 1024 * 1024, hard_cap_bytes: 16 * 1024 * 1024, }
}
}
#[derive(Debug, Clone)]
pub struct DbOptions {
pub create_if_missing: bool,
pub memtable_flush_threshold: usize,
pub edge_uniqueness: bool,
pub dense_vector: Option<DenseVectorConfig>,
pub compact_after_n_flushes: u32,
pub wal_sync_mode: WalSyncMode,
pub memtable_hard_cap_bytes: usize,
pub max_immutable_memtables: usize,
}
impl Default for DbOptions {
fn default() -> Self {
DbOptions {
create_if_missing: true,
memtable_flush_threshold: 128 * 1024 * 1024, edge_uniqueness: false,
dense_vector: None,
compact_after_n_flushes: 4,
wal_sync_mode: WalSyncMode::default(),
memtable_hard_cap_bytes: 512 * 1024 * 1024, max_immutable_memtables: 4,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_default_db_options() {
let opts = DbOptions::default();
assert!(opts.create_if_missing);
assert_eq!(opts.memtable_flush_threshold, 128 * 1024 * 1024);
assert!(!opts.edge_uniqueness);
assert!(opts.dense_vector.is_none());
assert_eq!(opts.compact_after_n_flushes, 4);
assert!(matches!(
opts.wal_sync_mode,
WalSyncMode::GroupCommit {
interval_ms: 50,
soft_trigger_bytes: 2097152,
hard_cap_bytes: 16777216,
}
));
assert_eq!(opts.memtable_hard_cap_bytes, 512 * 1024 * 1024);
assert_eq!(opts.max_immutable_memtables, 4);
}
#[test]
fn test_prop_value_equality() {
assert_eq!(PropValue::Null, PropValue::Null);
assert_eq!(PropValue::Bool(true), PropValue::Bool(true));
assert_ne!(PropValue::Int(1), PropValue::Int(2));
assert_eq!(
PropValue::String("hello".to_string()),
PropValue::String("hello".to_string())
);
assert_eq!(
PropValue::Array(vec![PropValue::Int(1), PropValue::Int(2)]),
PropValue::Array(vec![PropValue::Int(1), PropValue::Int(2)])
);
}
#[test]
fn test_prop_value_map() {
let mut inner = BTreeMap::new();
inner.insert("nested_key".to_string(), PropValue::Int(42));
inner.insert("flag".to_string(), PropValue::Bool(true));
let map = PropValue::Map(inner.clone());
assert_eq!(map, PropValue::Map(inner));
}
#[test]
#[allow(clippy::approx_constant)]
fn test_prop_value_map_msgpack_roundtrip() {
let mut inner = BTreeMap::new();
inner.insert("x".to_string(), PropValue::Float(3.14));
inner.insert("label".to_string(), PropValue::String("hello".into()));
inner.insert(
"items".to_string(),
PropValue::Array(vec![PropValue::Int(1), PropValue::Int(2)]),
);
let mut nested = BTreeMap::new();
nested.insert("deep".to_string(), PropValue::Bool(false));
inner.insert("child".to_string(), PropValue::Map(nested));
let map = PropValue::Map(inner);
let bytes = rmp_serde::to_vec(&map).expect("serialize");
let decoded: PropValue = rmp_serde::from_slice(&bytes).expect("deserialize");
assert_eq!(map, decoded);
}
#[test]
fn test_op_tag_roundtrip() {
for tag_val in 1u8..=4 {
let tag = OpTag::from_u8(tag_val).unwrap();
assert_eq!(tag as u8, tag_val);
}
assert!(OpTag::from_u8(0).is_none());
assert!(OpTag::from_u8(5).is_none());
assert!(OpTag::from_u8(255).is_none());
}
#[test]
fn test_direction_serde_roundtrip() {
for dir in [Direction::Outgoing, Direction::Incoming, Direction::Both] {
let json = serde_json::to_string(&dir).unwrap();
let back: Direction = serde_json::from_str(&json).unwrap();
assert_eq!(dir, back);
}
}
#[test]
fn test_neighbor_entry_serde_roundtrip() {
let entry = NeighborEntry {
node_id: 42,
edge_id: 99,
edge_type_id: 7,
weight: 0.75,
valid_from: 1000,
valid_to: i64::MAX,
};
let json = serde_json::to_string(&entry).unwrap();
let back: NeighborEntry = serde_json::from_str(&json).unwrap();
assert_eq!(entry, back);
}
#[test]
fn test_manifest_state_serde() {
let state = ManifestState {
version: 1,
segments: vec![
SegmentInfo {
id: 1,
node_count: 100,
edge_count: 200,
},
SegmentInfo {
id: 2,
node_count: 50,
edge_count: 75,
},
],
next_node_id: 151,
next_edge_id: 276,
dense_vector: Some(DenseVectorConfig {
dimension: 384,
metric: DenseMetric::Cosine,
hnsw: HnswConfig::default(),
}),
prune_policies: BTreeMap::new(),
next_engine_seq: 0,
next_wal_generation_id: 0,
active_wal_generation_id: 0,
pending_flush_epochs: Vec::new(),
secondary_indexes: Vec::new(),
next_secondary_index_id: 1,
};
let json = serde_json::to_string(&state).unwrap();
let loaded: ManifestState = serde_json::from_str(&json).unwrap();
assert_eq!(loaded.version, 1);
assert_eq!(loaded.segments.len(), 2);
assert_eq!(loaded.next_node_id, 151);
assert_eq!(loaded.next_edge_id, 276);
assert_eq!(loaded.dense_vector, state.dense_vector);
}
#[test]
fn test_validate_dense_vector_config_rejects_invalid_values() {
let err = validate_dense_vector_config(&DenseVectorConfig {
dimension: 0,
metric: DenseMetric::Cosine,
hnsw: HnswConfig::default(),
})
.unwrap_err();
assert!(matches!(err, EngineError::InvalidOperation(_)));
let err = validate_dense_vector_config(&DenseVectorConfig {
dimension: 8,
metric: DenseMetric::Cosine,
hnsw: HnswConfig {
m: 32,
ef_construction: 16,
},
})
.unwrap_err();
assert!(matches!(err, EngineError::InvalidOperation(_)));
}
#[test]
fn test_validate_dense_vector_rejects_wrong_length_and_non_finite_values() {
let config = DenseVectorConfig {
dimension: 3,
metric: DenseMetric::DotProduct,
hnsw: HnswConfig::default(),
};
let err = validate_dense_vector(&[1.0, 2.0], &config).unwrap_err();
assert!(matches!(err, EngineError::InvalidOperation(_)));
let err = validate_dense_vector(&[1.0, f32::NAN, 3.0], &config).unwrap_err();
assert!(matches!(err, EngineError::InvalidOperation(_)));
}
#[test]
fn test_canonicalize_sparse_vector_sorts_merges_and_drops_zeros() {
let canonical = canonicalize_sparse_vector(&[
(9, 0.0),
(4, 1.5),
(2, 2.0),
(4, 0.5),
(2, 0.0),
(7, 3.0),
(4, 1.0),
])
.unwrap()
.unwrap();
assert_eq!(canonical, vec![(2, 2.0), (4, 3.0), (7, 3.0)]);
}
#[test]
fn test_canonicalize_sparse_vector_rejects_non_finite_values() {
let err = canonicalize_sparse_vector(&[(1, f32::INFINITY)]).unwrap_err();
assert!(matches!(err, EngineError::InvalidOperation(_)));
}
#[test]
fn test_canonicalize_sparse_vector_rejects_negative_values() {
let err = canonicalize_sparse_vector(&[(1, -0.25)]).unwrap_err();
assert!(matches!(err, EngineError::InvalidOperation(_)));
assert!(err
.to_string()
.contains("sparse vector weights must be non-negative"));
}
#[test]
fn test_upsert_node_options_default() {
let opts = UpsertNodeOptions::default();
assert!(opts.props.is_empty());
assert_eq!(opts.weight, 1.0);
assert!(opts.dense_vector.is_none());
assert!(opts.sparse_vector.is_none());
}
#[test]
fn test_upsert_edge_options_default() {
let opts = UpsertEdgeOptions::default();
assert!(opts.props.is_empty());
assert_eq!(opts.weight, 1.0);
assert!(opts.valid_from.is_none());
assert!(opts.valid_to.is_none());
}
#[test]
fn test_neighbor_options_default() {
let opts = NeighborOptions::default();
assert_eq!(opts.direction, Direction::Outgoing);
assert!(opts.type_filter.is_none());
assert!(opts.limit.is_none());
assert!(opts.at_epoch.is_none());
assert!(opts.decay_lambda.is_none());
}
#[test]
fn test_degree_options_default() {
let opts = DegreeOptions::default();
assert_eq!(opts.direction, Direction::Outgoing);
assert!(opts.type_filter.is_none());
assert!(opts.at_epoch.is_none());
}
#[test]
fn test_traverse_options_default() {
let opts = TraverseOptions::default();
assert_eq!(opts.min_depth, 1);
assert_eq!(opts.direction, Direction::Outgoing);
assert!(opts.edge_type_filter.is_none());
assert!(opts.node_type_filter.is_none());
assert!(opts.at_epoch.is_none());
assert!(opts.decay_lambda.is_none());
assert!(opts.limit.is_none());
assert!(opts.cursor.is_none());
}
#[test]
fn test_shortest_path_options_default() {
let opts = ShortestPathOptions::default();
assert_eq!(opts.direction, Direction::Outgoing);
assert!(opts.type_filter.is_none());
assert!(opts.weight_field.is_none());
assert!(opts.at_epoch.is_none());
assert!(opts.max_depth.is_none());
assert!(opts.max_cost.is_none());
}
#[test]
fn test_component_options_default() {
let opts = ComponentOptions::default();
assert!(opts.edge_type_filter.is_none());
assert!(opts.node_type_filter.is_none());
assert!(opts.at_epoch.is_none());
}
#[test]
fn test_page_request_default() {
let req = PageRequest::default();
assert!(req.limit.is_none());
assert!(req.after.is_none());
}
#[test]
fn test_page_result_last_page() {
let result: PageResult<u64> = PageResult {
items: vec![1, 2, 3],
next_cursor: None,
};
assert_eq!(result.items.len(), 3);
assert!(result.next_cursor.is_none());
}
#[test]
fn test_page_result_has_more() {
let result: PageResult<u64> = PageResult {
items: vec![1, 2, 3],
next_cursor: Some(3),
};
assert_eq!(result.items.len(), 3);
assert_eq!(result.next_cursor, Some(3));
}
}