use crate::types::*;
use std::collections::{BTreeSet, HashMap};
use std::ops::ControlFlow;
fn encode_signed_range_key(value: i64) -> u64 {
(value as u64) ^ (1u64 << 63)
}
fn encode_float_range_key(value: f64) -> Option<u64> {
if !value.is_finite() {
return None;
}
let normalized = if value == 0.0 { 0.0 } else { value };
let bits = normalized.to_bits();
Some(if bits & (1u64 << 63) != 0 {
!bits
} else {
bits ^ (1u64 << 63)
})
}
pub(crate) fn encode_range_prop_value(
domain: SecondaryIndexRangeDomain,
value: &PropValue,
) -> Option<u64> {
match (domain, value) {
(SecondaryIndexRangeDomain::Int, PropValue::Int(value)) => {
Some(encode_signed_range_key(*value))
}
(SecondaryIndexRangeDomain::UInt, PropValue::UInt(value)) => Some(*value),
(SecondaryIndexRangeDomain::Float, PropValue::Float(value)) => {
encode_float_range_key(*value)
}
_ => None,
}
}
#[derive(Debug, Clone)]
pub struct AdjEntry {
pub edge_id: u64,
pub type_id: u32,
pub neighbor_id: u64,
pub weight: f32,
pub valid_from: i64,
pub valid_to: i64,
}
#[derive(Clone)]
pub struct Memtable {
nodes: NodeIdMap<NodeRecord>,
edges: NodeIdMap<EdgeRecord>,
deleted_nodes: NodeIdMap<TombstoneEntry>,
deleted_edges: NodeIdMap<TombstoneEntry>,
node_key_index: HashMap<u32, HashMap<String, u64>>,
edge_triple_index: HashMap<(u64, u64, u32), u64>,
adj_out: NodeIdMap<NodeIdMap<AdjEntry>>,
adj_in: NodeIdMap<NodeIdMap<AdjEntry>>,
type_node_index: HashMap<u32, NodeIdSet>,
type_edge_index: HashMap<u32, NodeIdSet>,
time_node_index: BTreeSet<(u32, i64, u64)>,
secondary_index_declarations: HashMap<u64, SecondaryIndexManifestEntry>,
secondary_eq_by_prop: HashMap<u32, HashMap<String, Vec<u64>>>,
secondary_range_by_prop: HashMap<u32, HashMap<String, Vec<(u64, SecondaryIndexRangeDomain)>>>,
secondary_eq_state: HashMap<u64, HashMap<u64, NodeIdSet>>,
secondary_range_state: HashMap<u64, BTreeSet<(u64, u64)>>,
}
impl Default for Memtable {
fn default() -> Self {
Self::new()
}
}
impl Memtable {
pub fn new() -> Self {
Memtable {
nodes: NodeIdMap::default(),
edges: NodeIdMap::default(),
deleted_nodes: NodeIdMap::default(),
deleted_edges: NodeIdMap::default(),
node_key_index: HashMap::new(),
edge_triple_index: HashMap::new(),
adj_out: NodeIdMap::default(),
adj_in: NodeIdMap::default(),
type_node_index: HashMap::new(),
type_edge_index: HashMap::new(),
time_node_index: BTreeSet::new(),
secondary_index_declarations: HashMap::new(),
secondary_eq_by_prop: HashMap::new(),
secondary_range_by_prop: HashMap::new(),
secondary_eq_state: HashMap::new(),
secondary_range_state: HashMap::new(),
}
}
pub fn apply_op(&mut self, op: &WalOp, last_write_seq: u64) {
match op {
WalOp::UpsertNode(node) => {
self.deleted_nodes.remove(&node.id);
let old_node = self.nodes.get(&node.id).cloned();
if let Some(old) = old_node.as_ref() {
self.remove_secondary_index_entries_for_node(old);
self.time_node_index
.remove(&(old.type_id, old.updated_at, old.id));
if old.type_id != node.type_id {
if let Some(set) = self.type_node_index.get_mut(&old.type_id) {
set.remove(&node.id);
if set.is_empty() {
self.type_node_index.remove(&old.type_id);
}
}
}
}
self.node_key_index
.entry(node.type_id)
.or_default()
.insert(node.key.clone(), node.id);
self.type_node_index
.entry(node.type_id)
.or_default()
.insert(node.id);
self.time_node_index
.insert((node.type_id, node.updated_at, node.id));
let mut stored = node.clone();
stored.last_write_seq = last_write_seq;
self.add_secondary_index_entries_for_node(&stored);
self.nodes.insert(node.id, stored);
}
WalOp::UpsertEdge(edge) => {
self.deleted_edges.remove(&edge.id);
if let Some(old) = self.edges.get(&edge.id) {
if old.type_id != edge.type_id {
if let Some(set) = self.type_edge_index.get_mut(&old.type_id) {
set.remove(&edge.id);
if set.is_empty() {
self.type_edge_index.remove(&old.type_id);
}
}
}
if old.from != edge.from || old.to != edge.to {
if let Some(map) = self.adj_out.get_mut(&old.from) {
map.remove(&edge.id);
if map.is_empty() {
self.adj_out.remove(&old.from);
}
}
if let Some(map) = self.adj_in.get_mut(&old.to) {
map.remove(&edge.id);
if map.is_empty() {
self.adj_in.remove(&old.to);
}
}
}
}
self.edge_triple_index
.insert((edge.from, edge.to, edge.type_id), edge.id);
self.adj_out.entry(edge.from).or_default().insert(
edge.id,
AdjEntry {
edge_id: edge.id,
type_id: edge.type_id,
neighbor_id: edge.to,
weight: edge.weight,
valid_from: edge.valid_from,
valid_to: edge.valid_to,
},
);
self.adj_in.entry(edge.to).or_default().insert(
edge.id,
AdjEntry {
edge_id: edge.id,
type_id: edge.type_id,
neighbor_id: edge.from,
weight: edge.weight,
valid_from: edge.valid_from,
valid_to: edge.valid_to,
},
);
self.type_edge_index
.entry(edge.type_id)
.or_default()
.insert(edge.id);
let mut stored = edge.clone();
stored.last_write_seq = last_write_seq;
self.edges.insert(edge.id, stored);
}
WalOp::DeleteNode { id, deleted_at } => {
if let Some(node) = self.nodes.remove(id) {
self.remove_secondary_index_entries_for_node(&node);
self.time_node_index
.remove(&(node.type_id, node.updated_at, node.id));
if let Some(set) = self.type_node_index.get_mut(&node.type_id) {
set.remove(&node.id);
if set.is_empty() {
self.type_node_index.remove(&node.type_id);
}
}
if let Some(inner) = self.node_key_index.get_mut(&node.type_id) {
inner.remove(&node.key);
if inner.is_empty() {
self.node_key_index.remove(&node.type_id);
}
}
}
self.adj_out.remove(id);
self.adj_in.remove(id);
self.deleted_nodes.insert(
*id,
TombstoneEntry {
deleted_at: *deleted_at,
last_write_seq,
},
);
}
WalOp::DeleteEdge { id, deleted_at } => {
if let Some(edge) = self.edges.remove(id) {
self.edge_triple_index
.remove(&(edge.from, edge.to, edge.type_id));
if let Some(set) = self.type_edge_index.get_mut(&edge.type_id) {
set.remove(&edge.id);
if set.is_empty() {
self.type_edge_index.remove(&edge.type_id);
}
}
if let Some(map) = self.adj_out.get_mut(&edge.from) {
map.remove(&edge.id);
if map.is_empty() {
self.adj_out.remove(&edge.from);
}
}
if let Some(map) = self.adj_in.get_mut(&edge.to) {
map.remove(&edge.id);
if map.is_empty() {
self.adj_in.remove(&edge.to);
}
}
}
self.deleted_edges.insert(
*id,
TombstoneEntry {
deleted_at: *deleted_at,
last_write_seq,
},
);
}
}
}
fn add_secondary_index_entries_for_props<'a, I>(&mut self, node_id: u64, type_id: u32, props: I)
where
I: IntoIterator<Item = (&'a String, &'a PropValue)>,
{
for (prop_key, prop_value) in props {
if let Some(index_ids) = self
.secondary_eq_by_prop
.get(&type_id)
.and_then(|by_prop| by_prop.get(prop_key.as_str()))
{
for &index_id in index_ids {
self.secondary_eq_state
.entry(index_id)
.or_default()
.entry(hash_prop_value(prop_value))
.or_default()
.insert(node_id);
}
}
if let Some(indexes) = self
.secondary_range_by_prop
.get(&type_id)
.and_then(|by_prop| by_prop.get(prop_key.as_str()))
{
for &(index_id, domain) in indexes {
if let Some(encoded) = encode_range_prop_value(domain, prop_value) {
self.secondary_range_state
.entry(index_id)
.or_default()
.insert((encoded, node_id));
}
}
}
}
}
fn add_secondary_index_entries_for_node(&mut self, node: &NodeRecord) {
self.add_secondary_index_entries_for_props(node.id, node.type_id, node.props.iter());
}
fn seed_secondary_equality_index_for_property(
&mut self,
index_id: u64,
type_id: u32,
prop_key: &str,
) {
let Some(node_ids) = self.type_node_index.get(&type_id) else {
return;
};
let mut sorted_node_ids: Vec<u64> = node_ids.iter().copied().collect();
sorted_node_ids.sort_unstable();
let mut staged: Vec<(u64, u64)> = Vec::new();
for node_id in sorted_node_ids {
let Some(node) = self.nodes.get(&node_id) else {
continue;
};
let Some(prop_value) = node.props.get(prop_key) else {
continue;
};
staged.push((hash_prop_value(prop_value), node_id));
}
let groups = self.secondary_eq_state.entry(index_id).or_default();
for (value_hash, node_id) in staged {
groups.entry(value_hash).or_default().insert(node_id);
}
}
fn seed_secondary_range_index_for_property(
&mut self,
index_id: u64,
type_id: u32,
prop_key: &str,
domain: SecondaryIndexRangeDomain,
) {
let Some(node_ids) = self.type_node_index.get(&type_id) else {
return;
};
let mut sorted_node_ids: Vec<u64> = node_ids.iter().copied().collect();
sorted_node_ids.sort_unstable();
let mut staged: Vec<(u64, u64)> = Vec::new();
for node_id in sorted_node_ids {
let Some(node) = self.nodes.get(&node_id) else {
continue;
};
let Some(prop_value) = node.props.get(prop_key) else {
continue;
};
let Some(encoded) = encode_range_prop_value(domain, prop_value) else {
continue;
};
staged.push((encoded, node_id));
}
let entries = self.secondary_range_state.entry(index_id).or_default();
for encoded_entry in staged {
entries.insert(encoded_entry);
}
}
fn remove_secondary_index_entries_for_node(&mut self, node: &NodeRecord) {
for (prop_key, prop_value) in &node.props {
if let Some(index_ids) = self
.secondary_eq_by_prop
.get(&node.type_id)
.and_then(|by_prop| by_prop.get(prop_key.as_str()))
{
let value_hash = hash_prop_value(prop_value);
for &index_id in index_ids {
let mut remove_group = false;
if let Some(groups) = self.secondary_eq_state.get_mut(&index_id) {
if let Some(ids) = groups.get_mut(&value_hash) {
ids.remove(&node.id);
if ids.is_empty() {
groups.remove(&value_hash);
}
}
remove_group = groups.is_empty();
}
if remove_group {
self.secondary_eq_state.remove(&index_id);
}
}
}
if let Some(indexes) = self
.secondary_range_by_prop
.get(&node.type_id)
.and_then(|by_prop| by_prop.get(prop_key.as_str()))
{
for &(index_id, domain) in indexes {
if let Some(encoded) = encode_range_prop_value(domain, prop_value) {
let mut remove_group = false;
if let Some(entries) = self.secondary_range_state.get_mut(&index_id) {
entries.remove(&(encoded, node.id));
remove_group = entries.is_empty();
}
if remove_group {
self.secondary_range_state.remove(&index_id);
}
}
}
}
}
}
pub fn register_secondary_index(&mut self, entry: &SecondaryIndexManifestEntry) {
if self
.secondary_index_declarations
.contains_key(&entry.index_id)
{
return;
}
self.secondary_index_declarations
.insert(entry.index_id, entry.clone());
match &entry.target {
SecondaryIndexTarget::NodeProperty { type_id, prop_key } => match &entry.kind {
SecondaryIndexKind::Equality => {
self.secondary_eq_by_prop
.entry(*type_id)
.or_default()
.entry(prop_key.clone())
.or_default()
.push(entry.index_id);
self.secondary_eq_state.entry(entry.index_id).or_default();
self.seed_secondary_equality_index_for_property(
entry.index_id,
*type_id,
prop_key,
);
}
SecondaryIndexKind::Range { domain } => {
self.secondary_range_by_prop
.entry(*type_id)
.or_default()
.entry(prop_key.clone())
.or_default()
.push((entry.index_id, *domain));
self.secondary_range_state
.entry(entry.index_id)
.or_default();
self.seed_secondary_range_index_for_property(
entry.index_id,
*type_id,
prop_key,
*domain,
);
}
},
}
}
pub fn unregister_secondary_index(&mut self, index_id: u64) -> bool {
let Some(entry) = self.secondary_index_declarations.remove(&index_id) else {
return false;
};
match entry.target {
SecondaryIndexTarget::NodeProperty { type_id, prop_key } => match entry.kind {
SecondaryIndexKind::Equality => {
let mut remove_type_entry = false;
if let Some(by_prop) = self.secondary_eq_by_prop.get_mut(&type_id) {
if let Some(index_ids) = by_prop.get_mut(&prop_key) {
index_ids.retain(|&id| id != index_id);
if index_ids.is_empty() {
by_prop.remove(&prop_key);
}
}
remove_type_entry = by_prop.is_empty();
}
if remove_type_entry {
self.secondary_eq_by_prop.remove(&type_id);
}
self.secondary_eq_state.remove(&index_id);
}
SecondaryIndexKind::Range { domain } => {
let mut remove_type_entry = false;
if let Some(by_prop) = self.secondary_range_by_prop.get_mut(&type_id) {
if let Some(indexes) = by_prop.get_mut(&prop_key) {
indexes.retain(|&(id, existing_domain)| {
id != index_id || existing_domain != domain
});
if indexes.is_empty() {
by_prop.remove(&prop_key);
}
}
remove_type_entry = by_prop.is_empty();
}
if remove_type_entry {
self.secondary_range_by_prop.remove(&type_id);
}
self.secondary_range_state.remove(&index_id);
}
},
}
true
}
pub fn get_node(&self, id: u64) -> Option<&NodeRecord> {
if self.deleted_nodes.contains_key(&id) {
return None;
}
self.nodes.get(&id)
}
pub fn get_edge(&self, id: u64) -> Option<&EdgeRecord> {
if self.deleted_edges.contains_key(&id) {
return None;
}
self.edges.get(&id)
}
pub fn node_by_key(&self, type_id: u32, key: &str) -> Option<&NodeRecord> {
let id = self.node_key_index.get(&type_id)?.get(key)?;
if self.deleted_nodes.contains_key(id) {
return None;
}
self.nodes.get(id)
}
pub fn edge_by_triple(&self, from: u64, to: u64, type_id: u32) -> Option<&EdgeRecord> {
let id = self.edge_triple_index.get(&(from, to, type_id))?;
if self.deleted_edges.contains_key(id) {
return None;
}
self.edges.get(id)
}
pub fn neighbors(
&self,
node_id: u64,
direction: Direction,
type_filter: Option<&[u32]>,
limit: usize,
) -> Vec<NeighborEntry> {
if self.deleted_nodes.contains_key(&node_id) {
return Vec::new();
}
let mut results = Vec::new();
let collect = |map: &NodeIdMap<AdjEntry>, results: &mut Vec<NeighborEntry>| {
for entry in map.values() {
if limit > 0 && results.len() >= limit {
break;
}
if let Some(types) = type_filter {
if !types.contains(&entry.type_id) {
continue;
}
}
if self.deleted_nodes.contains_key(&entry.neighbor_id) {
continue;
}
results.push(NeighborEntry {
node_id: entry.neighbor_id,
edge_id: entry.edge_id,
edge_type_id: entry.type_id,
weight: entry.weight,
valid_from: entry.valid_from,
valid_to: entry.valid_to,
});
}
};
match direction {
Direction::Outgoing => {
if let Some(map) = self.adj_out.get(&node_id) {
collect(map, &mut results);
}
}
Direction::Incoming => {
if let Some(map) = self.adj_in.get(&node_id) {
collect(map, &mut results);
}
}
Direction::Both => {
if limit == 0 {
let mut self_loop_edge_ids = NodeIdSet::default();
if let Some(map) = self.adj_out.get(&node_id) {
for entry in map.values() {
if let Some(types) = type_filter {
if !types.contains(&entry.type_id) {
continue;
}
}
if self.deleted_nodes.contains_key(&entry.neighbor_id) {
continue;
}
if entry.neighbor_id == node_id {
self_loop_edge_ids.insert(entry.edge_id);
}
results.push(NeighborEntry {
node_id: entry.neighbor_id,
edge_id: entry.edge_id,
edge_type_id: entry.type_id,
weight: entry.weight,
valid_from: entry.valid_from,
valid_to: entry.valid_to,
});
}
}
if let Some(map) = self.adj_in.get(&node_id) {
for entry in map.values() {
if let Some(types) = type_filter {
if !types.contains(&entry.type_id) {
continue;
}
}
if self.deleted_nodes.contains_key(&entry.neighbor_id) {
continue;
}
if entry.neighbor_id == node_id
&& self_loop_edge_ids.contains(&entry.edge_id)
{
continue;
}
results.push(NeighborEntry {
node_id: entry.neighbor_id,
edge_id: entry.edge_id,
edge_type_id: entry.type_id,
weight: entry.weight,
valid_from: entry.valid_from,
valid_to: entry.valid_to,
});
}
}
} else {
let mut self_loop_edge_ids = NodeIdSet::default();
if let Some(map) = self.adj_out.get(&node_id) {
for entry in map.values() {
if results.len() >= limit {
break;
}
if let Some(types) = type_filter {
if !types.contains(&entry.type_id) {
continue;
}
}
if self.deleted_nodes.contains_key(&entry.neighbor_id) {
continue;
}
if entry.neighbor_id == node_id {
self_loop_edge_ids.insert(entry.edge_id);
}
results.push(NeighborEntry {
node_id: entry.neighbor_id,
edge_id: entry.edge_id,
edge_type_id: entry.type_id,
weight: entry.weight,
valid_from: entry.valid_from,
valid_to: entry.valid_to,
});
}
}
let mut remaining_raw = limit.saturating_sub(results.len());
if remaining_raw > 0 {
if let Some(map) = self.adj_in.get(&node_id) {
for entry in map.values() {
if remaining_raw == 0 {
break;
}
if let Some(types) = type_filter {
if !types.contains(&entry.type_id) {
continue;
}
}
if self.deleted_nodes.contains_key(&entry.neighbor_id) {
continue;
}
remaining_raw -= 1;
if entry.neighbor_id == node_id
&& self_loop_edge_ids.contains(&entry.edge_id)
{
continue;
}
results.push(NeighborEntry {
node_id: entry.neighbor_id,
edge_id: entry.edge_id,
edge_type_id: entry.type_id,
weight: entry.weight,
valid_from: entry.valid_from,
valid_to: entry.valid_to,
});
}
}
}
}
}
}
results
}
pub fn neighbors_batch(
&self,
node_ids: &[u64],
direction: Direction,
type_filter: Option<&[u32]>,
) -> NodeIdMap<Vec<NeighborEntry>> {
let mut results = NodeIdMap::default();
for &nid in node_ids {
let entries = self.neighbors(nid, direction, type_filter, 0);
if !entries.is_empty() {
results.insert(nid, entries);
}
}
results
}
pub fn for_each_adj_entry<F>(
&self,
node_id: u64,
direction: Direction,
type_filter: Option<&[u32]>,
callback: &mut F,
) -> ControlFlow<()>
where
F: FnMut(u64, u64, f32, i64, i64) -> ControlFlow<()>,
{
if self.deleted_nodes.contains_key(&node_id) {
return ControlFlow::Continue(());
}
let visit = |map: &NodeIdMap<AdjEntry>, cb: &mut F| -> ControlFlow<()> {
for entry in map.values() {
if let Some(types) = type_filter {
if !types.contains(&entry.type_id) {
continue;
}
}
if self.deleted_nodes.contains_key(&entry.neighbor_id) {
continue;
}
if cb(
entry.edge_id,
entry.neighbor_id,
entry.weight,
entry.valid_from,
entry.valid_to,
)
.is_break()
{
return ControlFlow::Break(());
}
}
ControlFlow::Continue(())
};
match direction {
Direction::Outgoing => {
if let Some(map) = self.adj_out.get(&node_id) {
visit(map, callback)?;
}
}
Direction::Incoming => {
if let Some(map) = self.adj_in.get(&node_id) {
visit(map, callback)?;
}
}
Direction::Both => {
let mut self_loop_edge_ids = NodeIdSet::default();
if let Some(map) = self.adj_out.get(&node_id) {
for entry in map.values() {
if let Some(types) = type_filter {
if !types.contains(&entry.type_id) {
continue;
}
}
if self.deleted_nodes.contains_key(&entry.neighbor_id) {
continue;
}
if entry.neighbor_id == node_id {
self_loop_edge_ids.insert(entry.edge_id);
}
if callback(
entry.edge_id,
entry.neighbor_id,
entry.weight,
entry.valid_from,
entry.valid_to,
)
.is_break()
{
return ControlFlow::Break(());
}
}
}
if let Some(map) = self.adj_in.get(&node_id) {
for entry in map.values() {
if let Some(types) = type_filter {
if !types.contains(&entry.type_id) {
continue;
}
}
if self.deleted_nodes.contains_key(&entry.neighbor_id) {
continue;
}
if entry.neighbor_id == node_id
&& self_loop_edge_ids.contains(&entry.edge_id)
{
continue;
}
if callback(
entry.edge_id,
entry.neighbor_id,
entry.weight,
entry.valid_from,
entry.valid_to,
)
.is_break()
{
return ControlFlow::Break(());
}
}
}
}
}
ControlFlow::Continue(())
}
pub fn incident_edge_ids(&self, node_id: u64) -> Vec<u64> {
let mut ids = Vec::new();
if let Some(map) = self.adj_out.get(&node_id) {
ids.extend(map.keys());
}
if let Some(map) = self.adj_in.get(&node_id) {
ids.extend(map.keys());
}
ids.sort_unstable();
ids.dedup();
ids
}
pub fn node_count(&self) -> usize {
self.nodes.len()
}
pub fn edge_count(&self) -> usize {
self.edges.len()
}
pub fn nodes(&self) -> &NodeIdMap<NodeRecord> {
&self.nodes
}
pub fn edges(&self) -> &NodeIdMap<EdgeRecord> {
&self.edges
}
pub fn deleted_nodes(&self) -> &NodeIdMap<TombstoneEntry> {
&self.deleted_nodes
}
pub fn deleted_edges(&self) -> &NodeIdMap<TombstoneEntry> {
&self.deleted_edges
}
pub fn adj_out(&self) -> &NodeIdMap<NodeIdMap<AdjEntry>> {
&self.adj_out
}
pub fn adj_in(&self) -> &NodeIdMap<NodeIdMap<AdjEntry>> {
&self.adj_in
}
pub fn nodes_by_type(&self, type_id: u32) -> Vec<u64> {
self.type_node_index
.get(&type_id)
.map(|set| set.iter().copied().collect())
.unwrap_or_default()
}
pub fn edges_by_type(&self, type_id: u32) -> Vec<u64> {
self.type_edge_index
.get(&type_id)
.map(|set| set.iter().copied().collect())
.unwrap_or_default()
}
pub fn type_node_index(&self) -> &HashMap<u32, NodeIdSet> {
&self.type_node_index
}
pub fn type_edge_index(&self) -> &HashMap<u32, NodeIdSet> {
&self.type_edge_index
}
pub fn secondary_index_declarations(&self) -> &HashMap<u64, SecondaryIndexManifestEntry> {
&self.secondary_index_declarations
}
pub fn secondary_eq_state(&self) -> &HashMap<u64, HashMap<u64, NodeIdSet>> {
&self.secondary_eq_state
}
pub fn secondary_range_state(&self) -> &HashMap<u64, BTreeSet<(u64, u64)>> {
&self.secondary_range_state
}
pub fn time_node_index(&self) -> &BTreeSet<(u32, i64, u64)> {
&self.time_node_index
}
pub fn nodes_by_time_range(&self, type_id: u32, from_ms: i64, to_ms: i64) -> Vec<u64> {
if from_ms > to_ms {
return Vec::new();
}
use std::ops::Bound;
let start = (type_id, from_ms, 0u64);
let end = (type_id, to_ms, u64::MAX);
let mut ids: Vec<u64> = self
.time_node_index
.range((Bound::Included(start), Bound::Included(end)))
.map(|&(_, _, node_id)| node_id)
.collect();
ids.sort_unstable();
ids
}
pub fn find_nodes(&self, type_id: u32, prop_key: &str, prop_value: &PropValue) -> Vec<u64> {
self.type_node_index
.get(&type_id)
.into_iter()
.flat_map(|ids| ids.iter())
.copied()
.filter(|id| {
self.nodes
.get(id)
.and_then(|n| n.props.get(prop_key))
.is_some_and(|value| value == prop_value)
})
.collect()
}
pub fn find_secondary_eq_nodes(
&self,
index_id: u64,
prop_key: &str,
prop_value: &PropValue,
) -> Vec<u64> {
let value_hash = hash_prop_value(prop_value);
let mut ids: Vec<u64> = self
.secondary_eq_state
.get(&index_id)
.and_then(|groups| groups.get(&value_hash))
.into_iter()
.flat_map(|ids| ids.iter())
.copied()
.filter(|id| {
self.nodes
.get(id)
.and_then(|node| node.props.get(prop_key))
.is_some_and(|value| value == prop_value)
})
.collect();
ids.sort_unstable();
ids
}
pub fn estimated_size(&self) -> usize {
let node_size: usize = self
.nodes
.values()
.map(|n| {
let dense_bytes = n
.dense_vector
.as_ref()
.map(|values| values.len() * std::mem::size_of::<f32>())
.unwrap_or(0);
let sparse_bytes = n
.sparse_vector
.as_ref()
.map(|values| {
values.len() * (std::mem::size_of::<u32>() + std::mem::size_of::<f32>())
})
.unwrap_or(0);
120 + n.key.len() + n.props.len() * 80 + dense_bytes + sparse_bytes
})
.sum();
let edge_size: usize = self.edges.values().map(|e| 100 + e.props.len() * 80).sum();
let tombstone_size = (self.deleted_nodes.len() + self.deleted_edges.len()) * 16;
let adj_size: usize = self.adj_out.values().map(|m| m.len() * 48).sum::<usize>()
+ self.adj_in.values().map(|m| m.len() * 48).sum::<usize>();
let type_idx_size: usize = self
.type_node_index
.values()
.map(|s| s.len() * 16)
.sum::<usize>()
+ self
.type_edge_index
.values()
.map(|s| s.len() * 16)
.sum::<usize>();
let time_idx_size = self.time_node_index.len() * 48;
let secondary_decl_size: usize = self
.secondary_index_declarations
.values()
.map(|entry| {
let prop_key_len = match &entry.target {
SecondaryIndexTarget::NodeProperty { prop_key, .. } => prop_key.len(),
};
96 + prop_key_len + entry.last_error.as_ref().map(|msg| msg.len()).unwrap_or(0)
})
.sum();
let secondary_eq_lookup_size: usize = self
.secondary_eq_by_prop
.values()
.map(|by_prop| {
32 + by_prop
.iter()
.map(|(prop_key, index_ids)| 48 + prop_key.len() + index_ids.len() * 8)
.sum::<usize>()
})
.sum();
let secondary_range_lookup_size: usize = self
.secondary_range_by_prop
.values()
.map(|by_prop| {
32 + by_prop
.iter()
.map(|(prop_key, indexes)| 48 + prop_key.len() + indexes.len() * 16)
.sum::<usize>()
})
.sum();
let secondary_eq_state_size: usize = self
.secondary_eq_state
.values()
.map(|groups| {
32 + groups
.values()
.map(|ids| 32 + ids.len() * 16)
.sum::<usize>()
})
.sum();
let secondary_range_state_size: usize = self
.secondary_range_state
.values()
.map(|entries| 32 + entries.len() * 16)
.sum();
node_size
+ edge_size
+ tombstone_size
+ adj_size
+ type_idx_size
+ time_idx_size
+ secondary_decl_size
+ secondary_eq_lookup_size
+ secondary_range_lookup_size
+ secondary_eq_state_size
+ secondary_range_state_size
}
pub fn is_empty(&self) -> bool {
self.nodes.is_empty()
&& self.edges.is_empty()
&& self.deleted_nodes.is_empty()
&& self.deleted_edges.is_empty()
}
pub fn max_node_id(&self) -> u64 {
let live_max = self.nodes.keys().max().copied().unwrap_or(0);
let deleted_max = self.deleted_nodes.keys().max().copied().unwrap_or(0);
live_max.max(deleted_max)
}
pub fn max_edge_id(&self) -> u64 {
let live_max = self.edges.keys().max().copied().unwrap_or(0);
let deleted_max = self.deleted_edges.keys().max().copied().unwrap_or(0);
live_max.max(deleted_max)
}
}
#[cfg(test)]
impl Memtable {
fn type_node_index_key_count(&self) -> usize {
self.type_node_index.len()
}
fn type_edge_index_key_count(&self) -> usize {
self.type_edge_index.len()
}
fn node_key_index_key_count(&self) -> usize {
self.node_key_index.len()
}
fn adj_out_key_count(&self) -> usize {
self.adj_out.len()
}
fn adj_in_key_count(&self) -> usize {
self.adj_in.len()
}
fn time_node_index_len(&self) -> usize {
self.time_node_index.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::BTreeMap;
fn make_node(id: u64, type_id: u32, key: &str) -> NodeRecord {
NodeRecord {
id,
type_id,
key: key.to_string(),
props: BTreeMap::new(),
created_at: 1000,
updated_at: 1001,
weight: 0.5,
dense_vector: None,
sparse_vector: None,
last_write_seq: 0,
}
}
fn make_edge(id: u64, from: u64, to: u64, type_id: u32) -> EdgeRecord {
EdgeRecord {
id,
from,
to,
type_id,
props: BTreeMap::new(),
created_at: 2000,
updated_at: 2001,
weight: 1.0,
valid_from: 0,
valid_to: i64::MAX,
last_write_seq: 0,
}
}
fn make_node_with_props(
id: u64,
type_id: u32,
key: &str,
props: BTreeMap<String, PropValue>,
) -> NodeRecord {
NodeRecord {
id,
type_id,
key: key.to_string(),
props,
created_at: 1000,
updated_at: 1001,
weight: 0.5,
dense_vector: None,
sparse_vector: None,
last_write_seq: 0,
}
}
#[test]
fn test_basic_insert_and_get() {
let mut mt = Memtable::new();
mt.apply_op(&WalOp::UpsertNode(make_node(1, 1, "alice")), 0);
mt.apply_op(&WalOp::UpsertEdge(make_edge(1, 1, 2, 10)), 0);
assert_eq!(mt.node_count(), 1);
assert_eq!(mt.edge_count(), 1);
assert_eq!(mt.get_node(1).unwrap().key, "alice");
assert_eq!(mt.get_edge(1).unwrap().from, 1);
}
#[test]
fn test_node_key_index() {
let mut mt = Memtable::new();
mt.apply_op(&WalOp::UpsertNode(make_node(1, 1, "alice")), 0);
mt.apply_op(&WalOp::UpsertNode(make_node(2, 1, "bob")), 0);
assert_eq!(mt.node_by_key(1, "alice").unwrap().id, 1);
assert_eq!(mt.node_by_key(1, "bob").unwrap().id, 2);
assert!(mt.node_by_key(1, "charlie").is_none());
assert!(mt.node_by_key(2, "alice").is_none()); }
#[test]
fn test_edge_triple_index() {
let mut mt = Memtable::new();
mt.apply_op(&WalOp::UpsertEdge(make_edge(1, 1, 2, 10)), 0);
mt.apply_op(&WalOp::UpsertEdge(make_edge(2, 1, 2, 20)), 0);
assert_eq!(mt.edge_by_triple(1, 2, 10).unwrap().id, 1);
assert_eq!(mt.edge_by_triple(1, 2, 20).unwrap().id, 2);
assert!(mt.edge_by_triple(1, 2, 30).is_none());
}
#[test]
fn test_upsert_overwrites_key_index() {
let mut mt = Memtable::new();
mt.apply_op(&WalOp::UpsertNode(make_node(1, 1, "alice")), 0);
let mut updated = make_node(1, 1, "alice");
updated.weight = 0.99;
mt.apply_op(&WalOp::UpsertNode(updated), 0);
assert_eq!(mt.node_count(), 1);
assert!((mt.get_node(1).unwrap().weight - 0.99).abs() < f32::EPSILON);
assert_eq!(mt.node_by_key(1, "alice").unwrap().id, 1);
}
#[test]
fn test_delete_removes_from_indexes() {
let mut mt = Memtable::new();
mt.apply_op(&WalOp::UpsertNode(make_node(1, 1, "alice")), 0);
mt.apply_op(&WalOp::UpsertEdge(make_edge(1, 1, 2, 10)), 0);
mt.apply_op(
&WalOp::DeleteNode {
id: 1,
deleted_at: 9999,
},
0,
);
mt.apply_op(
&WalOp::DeleteEdge {
id: 1,
deleted_at: 9999,
},
0,
);
assert!(mt.get_node(1).is_none());
assert!(mt.get_edge(1).is_none());
assert!(mt.node_by_key(1, "alice").is_none());
assert!(mt.edge_by_triple(1, 2, 10).is_none());
assert_eq!(mt.node_count(), 0);
assert_eq!(mt.edge_count(), 0);
}
#[test]
fn test_max_ids() {
let mut mt = Memtable::new();
assert_eq!(mt.max_node_id(), 0);
assert_eq!(mt.max_edge_id(), 0);
mt.apply_op(&WalOp::UpsertNode(make_node(42, 1, "high")), 0);
mt.apply_op(&WalOp::UpsertEdge(make_edge(99, 1, 2, 10)), 0);
assert_eq!(mt.max_node_id(), 42);
assert_eq!(mt.max_edge_id(), 99);
mt.apply_op(
&WalOp::DeleteNode {
id: 42,
deleted_at: 9999,
},
0,
);
assert_eq!(mt.max_node_id(), 42); }
#[test]
fn test_re_upsert_after_delete() {
let mut mt = Memtable::new();
mt.apply_op(&WalOp::UpsertNode(make_node(1, 1, "alice")), 0);
mt.apply_op(
&WalOp::DeleteNode {
id: 1,
deleted_at: 9999,
},
0,
);
assert!(mt.get_node(1).is_none());
mt.apply_op(&WalOp::UpsertNode(make_node(1, 1, "alice_v2")), 0);
assert_eq!(mt.get_node(1).unwrap().key, "alice_v2");
assert_eq!(mt.node_by_key(1, "alice_v2").unwrap().id, 1);
}
#[test]
fn test_adjacency_built_on_edge_insert() {
let mut mt = Memtable::new();
mt.apply_op(&WalOp::UpsertNode(make_node(1, 1, "alice")), 0);
mt.apply_op(&WalOp::UpsertNode(make_node(2, 1, "bob")), 0);
mt.apply_op(&WalOp::UpsertEdge(make_edge(1, 1, 2, 10)), 0);
let out = mt.neighbors(1, Direction::Outgoing, None, 0);
assert_eq!(out.len(), 1);
assert_eq!(out[0].node_id, 2);
assert_eq!(out[0].edge_id, 1);
assert_eq!(out[0].edge_type_id, 10);
let inc = mt.neighbors(2, Direction::Incoming, None, 0);
assert_eq!(inc.len(), 1);
assert_eq!(inc[0].node_id, 1);
assert!(mt.neighbors(1, Direction::Incoming, None, 0).is_empty());
assert!(mt.neighbors(2, Direction::Outgoing, None, 0).is_empty());
}
#[test]
fn test_for_each_adj_entry_breaks_early() {
let mut mt = Memtable::new();
for id in 1..=4 {
mt.apply_op(&WalOp::UpsertNode(make_node(id, 1, &format!("n{}", id))), 0);
}
mt.apply_op(&WalOp::UpsertEdge(make_edge(1, 1, 2, 10)), 0);
mt.apply_op(&WalOp::UpsertEdge(make_edge(2, 1, 3, 10)), 0);
mt.apply_op(&WalOp::UpsertEdge(make_edge(3, 1, 4, 10)), 0);
let mut seen = 0usize;
let flow = mt.for_each_adj_entry(
1,
Direction::Outgoing,
None,
&mut |_edge_id, _neighbor_id, _weight, _valid_from, _valid_to| {
seen += 1;
ControlFlow::Break(())
},
);
assert!(matches!(flow, ControlFlow::Break(())));
assert_eq!(seen, 1);
}
#[test]
fn test_neighbors_with_type_filter() {
let mut mt = Memtable::new();
mt.apply_op(&WalOp::UpsertNode(make_node(1, 1, "a")), 0);
mt.apply_op(&WalOp::UpsertNode(make_node(2, 1, "b")), 0);
mt.apply_op(&WalOp::UpsertNode(make_node(3, 1, "c")), 0);
mt.apply_op(&WalOp::UpsertEdge(make_edge(1, 1, 2, 10)), 0); mt.apply_op(&WalOp::UpsertEdge(make_edge(2, 1, 3, 20)), 0);
let all = mt.neighbors(1, Direction::Outgoing, None, 0);
assert_eq!(all.len(), 2);
let typed = mt.neighbors(1, Direction::Outgoing, Some(&[10]), 0);
assert_eq!(typed.len(), 1);
assert_eq!(typed[0].node_id, 2);
let typed = mt.neighbors(1, Direction::Outgoing, Some(&[20]), 0);
assert_eq!(typed.len(), 1);
assert_eq!(typed[0].node_id, 3);
let typed = mt.neighbors(1, Direction::Outgoing, Some(&[99]), 0);
assert!(typed.is_empty());
}
#[test]
fn test_neighbors_with_limit() {
let mut mt = Memtable::new();
mt.apply_op(&WalOp::UpsertNode(make_node(1, 1, "hub")), 0);
for i in 2..=6 {
mt.apply_op(&WalOp::UpsertNode(make_node(i, 1, &format!("n{}", i))), 0);
mt.apply_op(&WalOp::UpsertEdge(make_edge(i - 1, 1, i, 10)), 0);
}
let limited = mt.neighbors(1, Direction::Outgoing, None, 3);
assert_eq!(limited.len(), 3);
let all = mt.neighbors(1, Direction::Outgoing, None, 0);
assert_eq!(all.len(), 5);
}
#[test]
fn test_neighbors_both_direction() {
let mut mt = Memtable::new();
mt.apply_op(&WalOp::UpsertNode(make_node(1, 1, "a")), 0);
mt.apply_op(&WalOp::UpsertNode(make_node(2, 1, "b")), 0);
mt.apply_op(&WalOp::UpsertNode(make_node(3, 1, "c")), 0);
mt.apply_op(&WalOp::UpsertEdge(make_edge(1, 1, 2, 10)), 0); mt.apply_op(&WalOp::UpsertEdge(make_edge(2, 3, 1, 10)), 0);
let both = mt.neighbors(1, Direction::Both, None, 0);
assert_eq!(both.len(), 2);
let neighbor_ids: Vec<u64> = both.iter().map(|e| e.node_id).collect();
assert!(neighbor_ids.contains(&2));
assert!(neighbor_ids.contains(&3));
}
#[test]
fn test_neighbors_both_with_limit_preserves_self_loop_budget_semantics() {
let mut mt = Memtable::new();
mt.apply_op(&WalOp::UpsertNode(make_node(1, 1, "a")), 0);
mt.apply_op(&WalOp::UpsertNode(make_node(2, 1, "b")), 0);
mt.apply_op(&WalOp::UpsertNode(make_node(3, 1, "c")), 0);
mt.apply_op(&WalOp::UpsertEdge(make_edge(1, 1, 1, 10)), 0); mt.apply_op(&WalOp::UpsertEdge(make_edge(2, 2, 1, 10)), 0); mt.apply_op(&WalOp::UpsertEdge(make_edge(3, 3, 1, 10)), 0);
let both = mt.neighbors(1, Direction::Both, None, 2);
assert_eq!(both.len(), 1);
assert_eq!(both[0].edge_id, 1);
}
#[test]
fn test_delete_edge_removes_from_adjacency() {
let mut mt = Memtable::new();
mt.apply_op(&WalOp::UpsertNode(make_node(1, 1, "a")), 0);
mt.apply_op(&WalOp::UpsertNode(make_node(2, 1, "b")), 0);
mt.apply_op(&WalOp::UpsertEdge(make_edge(1, 1, 2, 10)), 0);
assert_eq!(mt.neighbors(1, Direction::Outgoing, None, 0).len(), 1);
mt.apply_op(
&WalOp::DeleteEdge {
id: 1,
deleted_at: 9999,
},
0,
);
assert!(mt.neighbors(1, Direction::Outgoing, None, 0).is_empty());
assert!(mt.neighbors(2, Direction::Incoming, None, 0).is_empty());
}
#[test]
fn test_deleted_node_excluded_from_neighbors() {
let mut mt = Memtable::new();
mt.apply_op(&WalOp::UpsertNode(make_node(1, 1, "a")), 0);
mt.apply_op(&WalOp::UpsertNode(make_node(2, 1, "b")), 0);
mt.apply_op(&WalOp::UpsertNode(make_node(3, 1, "c")), 0);
mt.apply_op(&WalOp::UpsertEdge(make_edge(1, 1, 2, 10)), 0);
mt.apply_op(&WalOp::UpsertEdge(make_edge(2, 1, 3, 10)), 0);
assert_eq!(mt.neighbors(1, Direction::Outgoing, None, 0).len(), 2);
mt.apply_op(
&WalOp::DeleteNode {
id: 2,
deleted_at: 9999,
},
0,
);
let out = mt.neighbors(1, Direction::Outgoing, None, 0);
assert_eq!(out.len(), 1);
assert_eq!(out[0].node_id, 3);
}
#[test]
fn test_adjacency_idempotent_on_edge_upsert() {
let mut mt = Memtable::new();
mt.apply_op(&WalOp::UpsertNode(make_node(1, 1, "a")), 0);
mt.apply_op(&WalOp::UpsertNode(make_node(2, 1, "b")), 0);
let mut edge = make_edge(1, 1, 2, 10);
mt.apply_op(&WalOp::UpsertEdge(edge.clone()), 0);
edge.weight = 0.9;
mt.apply_op(&WalOp::UpsertEdge(edge), 0);
let out = mt.neighbors(1, Direction::Outgoing, None, 0);
assert_eq!(out.len(), 1);
assert!((out[0].weight - 0.9).abs() < f32::EPSILON); }
#[test]
fn test_type_node_index_basic() {
let mut mt = Memtable::new();
mt.apply_op(&WalOp::UpsertNode(make_node(1, 1, "alice")), 0);
mt.apply_op(&WalOp::UpsertNode(make_node(2, 1, "bob")), 0);
mt.apply_op(&WalOp::UpsertNode(make_node(3, 2, "charlie")), 0);
let mut type1: Vec<u64> = mt.nodes_by_type(1);
type1.sort();
assert_eq!(type1, vec![1, 2]);
assert_eq!(mt.nodes_by_type(2), vec![3]);
assert!(mt.nodes_by_type(99).is_empty());
}
#[test]
fn test_type_edge_index_basic() {
let mut mt = Memtable::new();
mt.apply_op(&WalOp::UpsertEdge(make_edge(1, 1, 2, 10)), 0);
mt.apply_op(&WalOp::UpsertEdge(make_edge(2, 2, 3, 20)), 0);
mt.apply_op(&WalOp::UpsertEdge(make_edge(3, 3, 4, 10)), 0);
let mut type10: Vec<u64> = mt.edges_by_type(10);
type10.sort();
assert_eq!(type10, vec![1, 3]);
assert_eq!(mt.edges_by_type(20), vec![2]);
assert!(mt.edges_by_type(99).is_empty());
}
#[test]
fn test_type_index_updated_on_delete() {
let mut mt = Memtable::new();
mt.apply_op(&WalOp::UpsertNode(make_node(1, 1, "alice")), 0);
mt.apply_op(&WalOp::UpsertNode(make_node(2, 1, "bob")), 0);
mt.apply_op(&WalOp::UpsertEdge(make_edge(1, 1, 2, 10)), 0);
mt.apply_op(
&WalOp::DeleteNode {
id: 1,
deleted_at: 9999,
},
0,
);
assert_eq!(mt.nodes_by_type(1), vec![2]);
mt.apply_op(
&WalOp::DeleteEdge {
id: 1,
deleted_at: 9999,
},
0,
);
assert!(mt.edges_by_type(10).is_empty());
}
#[test]
fn test_empty_index_sets_pruned_after_deletes() {
let mut mt = Memtable::new();
mt.apply_op(&WalOp::UpsertNode(make_node(1, 1, "alice")), 0);
mt.apply_op(&WalOp::UpsertNode(make_node(2, 2, "bob")), 0);
mt.apply_op(&WalOp::UpsertEdge(make_edge(1, 1, 2, 10)), 0);
assert_eq!(mt.type_node_index_key_count(), 2); assert_eq!(mt.type_edge_index_key_count(), 1);
mt.apply_op(
&WalOp::DeleteNode {
id: 1,
deleted_at: 9999,
},
0,
);
assert_eq!(mt.type_node_index_key_count(), 1);
mt.apply_op(
&WalOp::DeleteEdge {
id: 1,
deleted_at: 9999,
},
0,
);
assert_eq!(mt.type_edge_index_key_count(), 0);
mt.apply_op(
&WalOp::DeleteNode {
id: 2,
deleted_at: 9999,
},
0,
);
assert_eq!(mt.type_node_index_key_count(), 0);
}
#[test]
fn test_edge_type_index_pruned_on_type_change() {
let mut mt = Memtable::new();
mt.apply_op(&WalOp::UpsertNode(make_node(1, 1, "alice")), 0);
mt.apply_op(&WalOp::UpsertNode(make_node(2, 1, "bob")), 0);
mt.apply_op(&WalOp::UpsertEdge(make_edge(1, 1, 2, 10)), 0);
assert_eq!(mt.type_edge_index_key_count(), 1);
mt.apply_op(&WalOp::UpsertEdge(make_edge(1, 1, 2, 20)), 0);
assert_eq!(mt.type_edge_index_key_count(), 1); assert!(mt.edges_by_type(10).is_empty());
assert_eq!(mt.edges_by_type(20), vec![1]);
}
#[test]
fn test_node_key_index_pruned_after_delete() {
let mut mt = Memtable::new();
mt.apply_op(&WalOp::UpsertNode(make_node(1, 1, "alice")), 0);
mt.apply_op(&WalOp::UpsertNode(make_node(2, 2, "bob")), 0);
assert_eq!(mt.node_key_index_key_count(), 2);
mt.apply_op(
&WalOp::DeleteNode {
id: 1,
deleted_at: 9999,
},
0,
);
assert_eq!(mt.node_key_index_key_count(), 1);
mt.apply_op(
&WalOp::DeleteNode {
id: 2,
deleted_at: 9999,
},
0,
);
assert_eq!(mt.node_key_index_key_count(), 0);
}
#[test]
fn test_adjacency_lists_pruned_after_edge_delete() {
let mut mt = Memtable::new();
mt.apply_op(&WalOp::UpsertNode(make_node(1, 1, "alice")), 0);
mt.apply_op(&WalOp::UpsertNode(make_node(2, 1, "bob")), 0);
mt.apply_op(&WalOp::UpsertEdge(make_edge(1, 1, 2, 10)), 0);
assert_eq!(mt.adj_out_key_count(), 1); assert_eq!(mt.adj_in_key_count(), 1);
mt.apply_op(
&WalOp::DeleteEdge {
id: 1,
deleted_at: 9999,
},
0,
);
assert_eq!(mt.adj_out_key_count(), 0);
assert_eq!(mt.adj_in_key_count(), 0);
}
#[test]
fn test_type_index_re_upsert_after_delete() {
let mut mt = Memtable::new();
mt.apply_op(&WalOp::UpsertNode(make_node(1, 1, "alice")), 0);
mt.apply_op(
&WalOp::DeleteNode {
id: 1,
deleted_at: 9999,
},
0,
);
assert!(mt.nodes_by_type(1).is_empty());
mt.apply_op(&WalOp::UpsertNode(make_node(1, 1, "alice_v2")), 0);
assert_eq!(mt.nodes_by_type(1), vec![1]);
}
#[test]
fn test_prop_index_basic_lookup() {
let mut mt = Memtable::new();
let mut props = BTreeMap::new();
props.insert("color".to_string(), PropValue::String("red".to_string()));
mt.apply_op(
&WalOp::UpsertNode(make_node_with_props(1, 1, "apple", props.clone())),
0,
);
let mut props2 = BTreeMap::new();
props2.insert("color".to_string(), PropValue::String("red".to_string()));
mt.apply_op(
&WalOp::UpsertNode(make_node_with_props(2, 1, "cherry", props2)),
0,
);
let mut props3 = BTreeMap::new();
props3.insert("color".to_string(), PropValue::String("green".to_string()));
mt.apply_op(
&WalOp::UpsertNode(make_node_with_props(3, 1, "lime", props3)),
0,
);
let mut reds = mt.find_nodes(1, "color", &PropValue::String("red".to_string()));
reds.sort();
assert_eq!(reds, vec![1, 2]);
let greens = mt.find_nodes(1, "color", &PropValue::String("green".to_string()));
assert_eq!(greens, vec![3]);
assert!(mt
.find_nodes(1, "color", &PropValue::String("blue".to_string()))
.is_empty());
assert!(mt
.find_nodes(1, "shape", &PropValue::String("round".to_string()))
.is_empty());
assert!(mt
.find_nodes(2, "color", &PropValue::String("red".to_string()))
.is_empty());
}
#[test]
fn test_prop_index_updated_on_upsert() {
let mut mt = Memtable::new();
let mut props = BTreeMap::new();
props.insert(
"status".to_string(),
PropValue::String("active".to_string()),
);
mt.apply_op(
&WalOp::UpsertNode(make_node_with_props(1, 1, "item", props)),
0,
);
assert_eq!(
mt.find_nodes(1, "status", &PropValue::String("active".to_string()))
.len(),
1
);
let mut props2 = BTreeMap::new();
props2.insert(
"status".to_string(),
PropValue::String("inactive".to_string()),
);
mt.apply_op(
&WalOp::UpsertNode(make_node_with_props(1, 1, "item", props2)),
0,
);
assert!(mt
.find_nodes(1, "status", &PropValue::String("active".to_string()))
.is_empty());
assert_eq!(
mt.find_nodes(1, "status", &PropValue::String("inactive".to_string())),
vec![1]
);
}
#[test]
fn test_prop_index_cleaned_on_delete() {
let mut mt = Memtable::new();
let mut props = BTreeMap::new();
props.insert("color".to_string(), PropValue::String("red".to_string()));
mt.apply_op(
&WalOp::UpsertNode(make_node_with_props(1, 1, "apple", props)),
0,
);
assert_eq!(
mt.find_nodes(1, "color", &PropValue::String("red".to_string()))
.len(),
1
);
mt.apply_op(
&WalOp::DeleteNode {
id: 1,
deleted_at: 9999,
},
0,
);
assert!(mt
.find_nodes(1, "color", &PropValue::String("red".to_string()))
.is_empty());
}
#[test]
fn test_prop_index_multiple_props_per_node() {
let mut mt = Memtable::new();
let mut props = BTreeMap::new();
props.insert("color".to_string(), PropValue::String("red".to_string()));
props.insert("size".to_string(), PropValue::Int(42));
mt.apply_op(
&WalOp::UpsertNode(make_node_with_props(1, 1, "item", props)),
0,
);
assert_eq!(
mt.find_nodes(1, "color", &PropValue::String("red".to_string())),
vec![1]
);
assert_eq!(mt.find_nodes(1, "size", &PropValue::Int(42)), vec![1]);
assert!(mt.find_nodes(1, "size", &PropValue::Int(99)).is_empty());
}
#[test]
fn test_prop_index_re_upsert_after_delete() {
let mut mt = Memtable::new();
let mut props = BTreeMap::new();
props.insert("tag".to_string(), PropValue::String("a".to_string()));
mt.apply_op(
&WalOp::UpsertNode(make_node_with_props(1, 1, "item", props)),
0,
);
mt.apply_op(
&WalOp::DeleteNode {
id: 1,
deleted_at: 9999,
},
0,
);
assert!(mt
.find_nodes(1, "tag", &PropValue::String("a".to_string()))
.is_empty());
let mut props2 = BTreeMap::new();
props2.insert("tag".to_string(), PropValue::String("b".to_string()));
mt.apply_op(
&WalOp::UpsertNode(make_node_with_props(1, 1, "item_v2", props2)),
0,
);
assert!(mt
.find_nodes(1, "tag", &PropValue::String("a".to_string()))
.is_empty());
assert_eq!(
mt.find_nodes(1, "tag", &PropValue::String("b".to_string())),
vec![1]
);
}
#[test]
fn test_estimated_size_includes_type_indexes() {
let mut mt = Memtable::new();
let size_empty = mt.estimated_size();
mt.apply_op(&WalOp::UpsertNode(make_node(1, 1, "alice")), 0);
mt.apply_op(&WalOp::UpsertNode(make_node(2, 2, "bob")), 0);
mt.apply_op(&WalOp::UpsertEdge(make_edge(1, 1, 2, 10)), 0);
let size_with_data = mt.estimated_size();
assert!(size_with_data > size_empty);
assert!(
size_with_data >= 48,
"estimated_size should include type index overhead"
);
}
#[test]
fn test_estimated_size_includes_vector_bytes() {
let mut mt = Memtable::new();
let base = mt.estimated_size();
let mut node = make_node(1, 1, "vector-node");
node.dense_vector = Some(vec![0.1, 0.2, 0.3, 0.4]);
node.sparse_vector = Some(vec![(1, 1.0), (9, 2.0)]);
mt.apply_op(&WalOp::UpsertNode(node), 0);
let sized = mt.estimated_size();
let expected_vector_bytes = 4 * std::mem::size_of::<f32>()
+ 2 * (std::mem::size_of::<u32>() + std::mem::size_of::<f32>());
assert!(
sized >= base + expected_vector_bytes,
"estimated_size should account for dense and sparse vector payload bytes"
);
}
fn make_node_at(id: u64, type_id: u32, key: &str, updated_at: i64) -> NodeRecord {
NodeRecord {
id,
type_id,
key: key.to_string(),
props: BTreeMap::new(),
created_at: 1000,
updated_at,
weight: 0.5,
dense_vector: None,
sparse_vector: None,
last_write_seq: 0,
}
}
#[test]
fn test_time_index_insert_and_query() {
let mut mt = Memtable::new();
mt.apply_op(&WalOp::UpsertNode(make_node_at(1, 1, "a", 100)), 0);
mt.apply_op(&WalOp::UpsertNode(make_node_at(2, 1, "b", 200)), 0);
mt.apply_op(&WalOp::UpsertNode(make_node_at(3, 2, "c", 150)), 0);
assert_eq!(mt.time_node_index_len(), 3);
let ids = mt.nodes_by_time_range(1, 0, 300);
assert_eq!(ids.len(), 2);
assert!(ids.contains(&1));
assert!(ids.contains(&2));
let ids2 = mt.nodes_by_time_range(2, 0, 300);
assert_eq!(ids2.len(), 1);
assert!(ids2.contains(&3));
}
#[test]
fn test_time_index_range_boundaries() {
let mut mt = Memtable::new();
mt.apply_op(&WalOp::UpsertNode(make_node_at(1, 1, "a", 100)), 0);
mt.apply_op(&WalOp::UpsertNode(make_node_at(2, 1, "b", 200)), 0);
mt.apply_op(&WalOp::UpsertNode(make_node_at(3, 1, "c", 300)), 0);
assert_eq!(mt.nodes_by_time_range(1, 100, 300).len(), 3);
assert_eq!(mt.nodes_by_time_range(1, 100, 200).len(), 2);
assert_eq!(mt.nodes_by_time_range(1, 200, 200).len(), 1);
assert_eq!(mt.nodes_by_time_range(1, 250, 250).len(), 0);
assert_eq!(mt.nodes_by_time_range(1, 300, 100).len(), 0);
}
#[test]
fn test_time_index_update_moves_entry() {
let mut mt = Memtable::new();
mt.apply_op(&WalOp::UpsertNode(make_node_at(1, 1, "a", 100)), 0);
assert_eq!(mt.nodes_by_time_range(1, 50, 150).len(), 1);
assert_eq!(mt.nodes_by_time_range(1, 150, 300).len(), 0);
mt.apply_op(&WalOp::UpsertNode(make_node_at(1, 1, "a", 200)), 0);
assert_eq!(mt.time_node_index_len(), 1); assert_eq!(mt.nodes_by_time_range(1, 50, 150).len(), 0); assert_eq!(mt.nodes_by_time_range(1, 150, 300).len(), 1); }
#[test]
fn test_time_index_delete_removes_entry() {
let mut mt = Memtable::new();
mt.apply_op(&WalOp::UpsertNode(make_node_at(1, 1, "a", 100)), 0);
mt.apply_op(&WalOp::UpsertNode(make_node_at(2, 1, "b", 200)), 0);
assert_eq!(mt.time_node_index_len(), 2);
mt.apply_op(
&WalOp::DeleteNode {
id: 1,
deleted_at: 9999,
},
0,
);
assert_eq!(mt.time_node_index_len(), 1);
assert_eq!(mt.nodes_by_time_range(1, 0, 300).len(), 1);
assert!(mt.nodes_by_time_range(1, 0, 300).contains(&2));
}
#[test]
fn test_apply_op_stores_last_write_seq() {
let mut mt = Memtable::new();
mt.apply_op(&WalOp::UpsertNode(make_node(1, 1, "a")), 10);
mt.apply_op(&WalOp::UpsertEdge(make_edge(1, 1, 2, 1)), 11);
assert_eq!(mt.get_node(1).unwrap().last_write_seq, 10);
assert_eq!(mt.get_edge(1).unwrap().last_write_seq, 11);
}
#[test]
fn test_tombstone_carries_seq() {
let mut mt = Memtable::new();
mt.apply_op(&WalOp::UpsertNode(make_node(1, 1, "a")), 5);
mt.apply_op(
&WalOp::DeleteNode {
id: 1,
deleted_at: 9999,
},
6,
);
let ts = mt.deleted_nodes().get(&1).unwrap();
assert_eq!(ts.deleted_at, 9999);
assert_eq!(ts.last_write_seq, 6);
mt.apply_op(&WalOp::UpsertEdge(make_edge(1, 1, 2, 1)), 7);
mt.apply_op(
&WalOp::DeleteEdge {
id: 1,
deleted_at: 8888,
},
8,
);
let ts = mt.deleted_edges().get(&1).unwrap();
assert_eq!(ts.deleted_at, 8888);
assert_eq!(ts.last_write_seq, 8);
}
#[test]
fn test_upsert_updates_last_write_seq() {
let mut mt = Memtable::new();
mt.apply_op(&WalOp::UpsertNode(make_node(1, 1, "a")), 10);
assert_eq!(mt.get_node(1).unwrap().last_write_seq, 10);
mt.apply_op(&WalOp::UpsertNode(make_node(1, 1, "a")), 20);
assert_eq!(mt.get_node(1).unwrap().last_write_seq, 20);
}
#[test]
fn test_register_secondary_index_seeds_existing_nodes_and_skips_incompatible_range_values() {
let mut mt = Memtable::new();
let mut props1 = BTreeMap::new();
props1.insert(
"status".to_string(),
PropValue::String("active".to_string()),
);
props1.insert("age".to_string(), PropValue::Int(42));
mt.apply_op(
&WalOp::UpsertNode(make_node_with_props(1, 1, "a", props1)),
1,
);
let mut props2 = BTreeMap::new();
props2.insert(
"status".to_string(),
PropValue::String("active".to_string()),
);
props2.insert("age".to_string(), PropValue::String("old".to_string()));
mt.apply_op(
&WalOp::UpsertNode(make_node_with_props(2, 1, "b", props2)),
2,
);
mt.register_secondary_index(&SecondaryIndexManifestEntry {
index_id: 10,
target: SecondaryIndexTarget::NodeProperty {
type_id: 1,
prop_key: "status".to_string(),
},
kind: SecondaryIndexKind::Equality,
state: SecondaryIndexState::Building,
last_error: None,
});
mt.register_secondary_index(&SecondaryIndexManifestEntry {
index_id: 11,
target: SecondaryIndexTarget::NodeProperty {
type_id: 1,
prop_key: "age".to_string(),
},
kind: SecondaryIndexKind::Range {
domain: SecondaryIndexRangeDomain::Int,
},
state: SecondaryIndexState::Building,
last_error: None,
});
let status_hash = hash_prop_value(&PropValue::String("active".to_string()));
let eq_group = mt.secondary_eq_state().get(&10).unwrap();
let eq_ids = eq_group.get(&status_hash).unwrap();
assert!(eq_ids.contains(&1));
assert!(eq_ids.contains(&2));
let range_entries = mt.secondary_range_state().get(&11).unwrap();
assert!(range_entries.contains(&(encode_signed_range_key(42), 1)));
assert_eq!(range_entries.len(), 1);
}
#[test]
fn test_secondary_index_state_updates_on_type_change_and_delete() {
let mut mt = Memtable::new();
mt.register_secondary_index(&SecondaryIndexManifestEntry {
index_id: 20,
target: SecondaryIndexTarget::NodeProperty {
type_id: 1,
prop_key: "age".to_string(),
},
kind: SecondaryIndexKind::Range {
domain: SecondaryIndexRangeDomain::Int,
},
state: SecondaryIndexState::Building,
last_error: None,
});
let mut props = BTreeMap::new();
props.insert("age".to_string(), PropValue::Int(10));
mt.apply_op(
&WalOp::UpsertNode(make_node_with_props(1, 1, "a", props)),
1,
);
assert!(mt
.secondary_range_state()
.get(&20)
.unwrap()
.contains(&(encode_signed_range_key(10), 1)));
let mut props2 = BTreeMap::new();
props2.insert("age".to_string(), PropValue::Int(11));
mt.apply_op(
&WalOp::UpsertNode(make_node_with_props(1, 2, "a", props2)),
2,
);
assert!(mt
.secondary_range_state()
.get(&20)
.is_none_or(|entries| entries.is_empty()));
let mut props3 = BTreeMap::new();
props3.insert("age".to_string(), PropValue::Int(12));
mt.apply_op(
&WalOp::UpsertNode(make_node_with_props(1, 1, "a", props3)),
3,
);
assert!(mt
.secondary_range_state()
.get(&20)
.unwrap()
.contains(&(encode_signed_range_key(12), 1)));
mt.apply_op(
&WalOp::DeleteNode {
id: 1,
deleted_at: 999,
},
4,
);
assert!(mt
.secondary_range_state()
.get(&20)
.is_none_or(|entries| entries.is_empty()));
}
}