use super::wal_codec::{bytes_to_u64_list, decode_typed_edge_delta_payload, TypedEdgeDeltaOp};
use super::*;
fn typed_inbound_edge_value_key(rel_type: &str, target_id: u64) -> String {
format!("{}\u{1f}{}", rel_type.trim(), target_id)
}
pub(crate) fn initialize_inbound_adjacency_index(handle: &mut StorageHandle) -> Result<()> {
if handle
.manifest
.has_bitmap_index(INTERNAL_INBOUND_ADJACENCY_INDEX)
{
return Ok(());
}
handle.manifest.register_bitmap_index(
INTERNAL_INBOUND_ADJACENCY_INDEX,
INTERNAL_INBOUND_ADJACENCY_FIELD,
)?;
let mut latest_full_nodes = std::collections::HashMap::<u64, (u64, Vec<u64>)>::new();
for table in handle.sstable_cache.values() {
for entry in &table.entries {
if entry.kind != sstable::EntryKind::FullNode {
continue;
}
let adjacency = bytes_to_u64_list(&entry.value);
match latest_full_nodes.get(&entry.key) {
Some((version, _)) if *version >= entry.version => {}
_ => {
latest_full_nodes.insert(entry.key, (entry.version, adjacency));
}
}
}
}
for (src_node_id, (_, adjacency)) in latest_full_nodes {
for dst_node_id in adjacency {
handle.bitmap_store.add_posting(
INTERNAL_INBOUND_ADJACENCY_INDEX,
&dst_node_id.to_string(),
src_node_id,
)?;
}
}
handle.manifest.persist()?;
handle
.bitmap_store
.flush_with_reactor(handle.reactor.as_ref())?;
Ok(())
}
pub(crate) fn initialize_typed_inbound_edge_index(handle: &mut StorageHandle) -> Result<()> {
if handle
.manifest
.has_bitmap_index(INTERNAL_TYPED_INBOUND_EDGE_INDEX)
{
return Ok(());
}
handle.manifest.register_bitmap_index(
INTERNAL_TYPED_INBOUND_EDGE_INDEX,
INTERNAL_TYPED_INBOUND_EDGE_FIELD,
)?;
let mut latest_edges =
std::collections::HashMap::<(u64, u64, String), (u64, TypedEdgeDeltaOp)>::new();
for table in handle.sstable_cache.values() {
for entry in &table.entries {
if entry.kind != sstable::EntryKind::EdgeDelta {
continue;
}
let Some(payload) = decode_typed_edge_delta_payload(&entry.value) else {
continue;
};
let key = (entry.key, payload.target_id, payload.rel_type.clone());
match latest_edges.get(&key) {
Some((version, _)) if *version >= entry.version => {}
_ => {
latest_edges.insert(key, (entry.version, payload.op));
}
}
}
}
for ((src_node_id, target_id, rel_type), (_, op)) in latest_edges {
if op == TypedEdgeDeltaOp::Add {
handle.bitmap_store.add_posting(
INTERNAL_TYPED_INBOUND_EDGE_INDEX,
&typed_inbound_edge_value_key(&rel_type, target_id),
src_node_id,
)?;
}
}
handle.manifest.persist()?;
handle
.bitmap_store
.flush_with_reactor(handle.reactor.as_ref())?;
Ok(())
}
pub(crate) fn apply_typed_inbound_edge_delta(
handle: &mut StorageHandle,
src_node_id: u64,
target_id: u64,
rel_type: &str,
op: TypedEdgeDeltaOp,
) -> Result<()> {
initialize_typed_inbound_edge_index(handle)?;
let value_key = typed_inbound_edge_value_key(rel_type, target_id);
match op {
TypedEdgeDeltaOp::Add => handle.bitmap_store.add_posting(
INTERNAL_TYPED_INBOUND_EDGE_INDEX,
&value_key,
src_node_id,
)?,
TypedEdgeDeltaOp::Delete => handle.bitmap_store.remove_posting(
INTERNAL_TYPED_INBOUND_EDGE_INDEX,
&value_key,
src_node_id,
)?,
}
Ok(())
}
pub(crate) fn update_inbound_adjacency_for_node(
handle: &mut StorageHandle,
src_node_id: u64,
old_adjacency: &[u64],
new_adjacency: &[u64],
) -> Result<()> {
initialize_inbound_adjacency_index(handle)?;
let old_targets = old_adjacency
.iter()
.copied()
.collect::<std::collections::BTreeSet<_>>();
let new_targets = new_adjacency
.iter()
.copied()
.collect::<std::collections::BTreeSet<_>>();
for dst_node_id in old_targets.difference(&new_targets) {
handle.bitmap_store.remove_posting(
INTERNAL_INBOUND_ADJACENCY_INDEX,
&dst_node_id.to_string(),
src_node_id,
)?;
}
for dst_node_id in new_targets.difference(&old_targets) {
handle.bitmap_store.add_posting(
INTERNAL_INBOUND_ADJACENCY_INDEX,
&dst_node_id.to_string(),
src_node_id,
)?;
}
Ok(())
}
pub fn node_is_embedding_pending(handle: &StorageHandle, node_id: u64) -> bool {
handle.embedding_pending_nodes.contains(&node_id)
}
pub fn node_is_tombstoned(handle: &StorageHandle, node_id: u64) -> bool {
handle.tombstoned_node_ids.contains(&node_id)
}
pub fn create_bitmap_index(
handle: &mut StorageHandle,
index_name: &str,
field_path: &str,
) -> Result<()> {
handle
.manifest
.register_bitmap_index(index_name, field_path)?;
handle.manifest.persist()?;
Ok(())
}
pub fn list_bitmap_indexes(handle: &StorageHandle) -> Vec<manifest::BitmapIndexDescriptor> {
handle.manifest.bitmap_indexes()
}
pub fn bitmap_add_posting(
handle: &mut StorageHandle,
index_name: &str,
value_key: &str,
node_id: u64,
) -> Result<()> {
apply_bitmap_posting(handle, index_name, value_key, node_id, true)?;
Ok(())
}
pub fn bitmap_postings(handle: &StorageHandle, index_name: &str, value_key: &str) -> Vec<u64> {
handle.bitmap_store.postings(index_name, value_key)
}
pub fn bitmap_postings_in_range_limit(
handle: &StorageHandle,
index_name: &str,
value_key: &str,
range_start: u64,
range_end_exclusive: u64,
limit: Option<usize>,
) -> Vec<u64> {
handle.bitmap_store.postings_in_range_limit(
index_name,
value_key,
range_start,
range_end_exclusive,
limit,
)
}
pub fn bitmap_postings_in_range_limit_for_request(
handle: &StorageHandle,
index_name: &str,
value_key: &str,
range_start: u64,
range_end_exclusive: u64,
limit: Option<usize>,
request: &ThreadCoreRequest,
) -> Vec<u64> {
let postings = bitmap_postings_in_range_limit(
handle,
index_name,
value_key,
range_start,
range_end_exclusive,
limit,
);
let mut filtered = Vec::new();
for node_id in postings {
if request_owns_node(request, node_id) {
filtered.push(node_id);
if let Some(limit) = limit {
if filtered.len() >= limit {
break;
}
}
}
}
filtered
}
pub fn get_inbound_neighbors(handle: &StorageHandle, node_id: u64) -> Vec<u64> {
handle
.bitmap_store
.postings(INTERNAL_INBOUND_ADJACENCY_INDEX, &node_id.to_string())
}
pub fn get_typed_inbound_neighbors(
handle: &StorageHandle,
node_id: u64,
rel_types: &[&str],
) -> Vec<u64> {
let mut combined = Vec::new();
let mut seen = std::collections::BTreeSet::new();
for rel_type in rel_types {
for src_node_id in handle.bitmap_store.postings(
INTERNAL_TYPED_INBOUND_EDGE_INDEX,
&typed_inbound_edge_value_key(rel_type, node_id),
) {
if seen.insert(src_node_id) {
combined.push(src_node_id);
}
}
}
combined
}
pub fn get_typed_outbound_neighbors(
handle: &mut StorageHandle,
node_id: u64,
rel_types: &[&str],
) -> Result<Vec<u64>> {
let logical = get_logical_node(handle, node_id)?;
let rel_type_set = rel_types
.iter()
.map(|value| value.trim())
.collect::<std::collections::BTreeSet<_>>();
let mut latest_edges =
std::collections::HashMap::<(u64, String), (u64, TypedEdgeDeltaOp)>::new();
for entry in &logical.deltas {
if entry.kind != sstable::EntryKind::EdgeDelta {
continue;
}
let Some(payload) = decode_typed_edge_delta_payload(&entry.value) else {
continue;
};
if !rel_type_set.contains(payload.rel_type.trim()) {
continue;
}
let key = (payload.target_id, payload.rel_type);
match latest_edges.get(&key) {
Some((version, _)) if *version >= entry.version => {}
_ => {
latest_edges.insert(key, (entry.version, payload.op));
}
}
}
let mut neighbors = latest_edges
.into_iter()
.filter_map(|((target_id, _), (_, op))| (op == TypedEdgeDeltaOp::Add).then_some(target_id))
.collect::<Vec<_>>();
neighbors.sort_unstable();
Ok(neighbors)
}
pub fn get_logical_node(handle: &mut StorageHandle, node_id: u64) -> Result<LogicalNode> {
if let Some(cached) = cache_get_logical_node(handle, node_id) {
return Ok(cached);
}
let mut runs: Vec<Vec<sstable::Entry>> = Vec::new();
let mem_entries = handle.memtable.entries_for_key(node_id);
if !mem_entries.is_empty() {
runs.push(mem_entries);
}
let run_paths: Vec<std::path::PathBuf> = handle.l0_runs.iter().rev().cloned().collect();
for path in run_paths {
let entries = read_entries_for_key_from_run(handle, &path, node_id)?;
if !entries.is_empty() {
runs.push(entries);
}
}
if runs.is_empty() {
return Ok(LogicalNode {
node_id,
full: None,
deltas: Vec::new(),
});
}
let mut all_entries = Vec::new();
for run in runs {
all_entries.extend(run);
}
let mut full: Option<sstable::Entry> = None;
for entry in &all_entries {
if entry.kind == sstable::EntryKind::FullNode {
match &full {
Some(existing) if existing.version >= entry.version => {}
_ => full = Some(entry.clone()),
}
}
}
let mut deltas = Vec::new();
let min_version = full.as_ref().map(|entry| entry.version).unwrap_or(0);
for entry in all_entries {
if entry.kind != sstable::EntryKind::FullNode && entry.version >= min_version {
deltas.push(entry);
}
}
if let Some(full) = &full {
handle.metrics.logical_bytes_read += full.value.len() as u64;
}
for delta in &deltas {
handle.metrics.logical_bytes_read += delta.value.len() as u64;
}
let logical = LogicalNode {
node_id,
full,
deltas,
};
cache_put_logical_node(handle, node_id, logical.clone());
Ok(logical)
}
pub fn get_logical_node_for_request(
handle: &mut StorageHandle,
node_id: u64,
request: &ThreadCoreRequest,
) -> Result<LogicalNode> {
if !request_owns_node(request, node_id) {
return Ok(LogicalNode {
node_id,
full: None,
deltas: Vec::new(),
});
}
get_logical_node(handle, node_id)
}
pub fn get_node_row_summary(
handle: &mut StorageHandle,
node_id: u64,
) -> Result<Option<NodeRowSummary>> {
if let Some(cached) = cache_get_logical_node(handle, node_id) {
return Ok(Some(node_row_summary_from_logical(&cached)));
}
let mut all_entries: Vec<sstable::Entry> = Vec::new();
let mem_entries = handle.memtable.entries_for_key(node_id);
all_entries.extend(mem_entries);
let run_paths: Vec<std::path::PathBuf> = handle.l0_runs.iter().rev().cloned().collect();
for path in run_paths {
all_entries.extend(read_entries_for_key_from_run(handle, &path, node_id)?);
}
if all_entries.is_empty() {
return Ok(None);
}
let mut newest_full_version = 0_u64;
let mut has_full = false;
let mut adjacency_degree = 0_usize;
let mut newest_full_bytes = 0_usize;
for entry in &all_entries {
if entry.kind == sstable::EntryKind::FullNode
&& (!has_full || entry.version > newest_full_version)
{
has_full = true;
newest_full_version = entry.version;
adjacency_degree = entry.value.len() / 8;
newest_full_bytes = entry.value.len();
}
}
if newest_full_bytes > 0 {
handle.metrics.logical_bytes_read += newest_full_bytes as u64;
}
let min_version = if has_full { newest_full_version } else { 0 };
let mut delta_count = 0_usize;
for entry in &all_entries {
if entry.kind != sstable::EntryKind::FullNode && entry.version >= min_version {
delta_count += 1;
handle.metrics.logical_bytes_read += entry.value.len() as u64;
}
}
Ok(Some(NodeRowSummary {
has_full,
delta_count,
adjacency_degree,
}))
}
pub fn get_node_row_summary_for_request(
handle: &mut StorageHandle,
node_id: u64,
request: &ThreadCoreRequest,
) -> Result<Option<NodeRowSummary>> {
if !request_owns_node(request, node_id) {
return Ok(None);
}
get_node_row_summary(handle, node_id)
}
fn read_entries_for_key_from_run(
handle: &mut StorageHandle,
path: &std::path::Path,
key: u64,
) -> Result<Vec<sstable::Entry>> {
if let Some(table) = handle.sstable_cache.get(path) {
return Ok(sstable::read_entries_for_key_in_table(table, key));
}
let table = sstable::read_sstable_with_reactor(path, handle.reactor.as_ref())?;
if let Ok(len) = handle.reactor.metadata_len(path) {
handle.metrics.sstable_bytes_read += len;
}
let entries = sstable::read_entries_for_key_in_table(&table, key);
handle.sstable_cache.insert(path.to_path_buf(), table);
Ok(entries)
}