use std::collections::HashSet;
use selene_core::NodeId;
use crate::error::{GraphError, GraphResult};
use crate::graph::{
CompositePropertyIndexEntry, PropertyIndexEntry, SeleneGraph, VectorIndexEntry,
};
use crate::store::{EdgeStore, NodeStore, RowIndex};
use crate::typed_index::TypedIndex;
const BASIS_POINTS_DENOMINATOR: u64 = 10_000;
pub const COMPACTION_RECOMMENDATION_MIN_RECLAIMABLE_ROWS: u64 = 1_024;
pub const COMPACTION_RECOMMENDATION_MIN_RECLAIMABLE_BASIS_POINTS: u64 = 2_500;
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
pub struct CompactionReport {
pub reclaimed_nodes: u64,
pub reclaimed_edges: u64,
}
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
pub struct CompactionStats {
pub allocated_nodes: u64,
pub live_nodes: u64,
pub reclaimable_nodes: u64,
pub allocated_edges: u64,
pub live_edges: u64,
pub reclaimable_edges: u64,
}
impl CompactionStats {
#[must_use]
pub fn from_graph(graph: &SeleneGraph) -> Self {
let allocated_nodes = graph.node_store.len() as u64;
let live_nodes = graph.node_count() as u64;
let allocated_edges = graph.edge_store.len() as u64;
let live_edges = graph.edge_count() as u64;
Self {
allocated_nodes,
live_nodes,
reclaimable_nodes: allocated_nodes.saturating_sub(live_nodes),
allocated_edges,
live_edges,
reclaimable_edges: allocated_edges.saturating_sub(live_edges),
}
}
#[must_use]
pub const fn allocated_rows(self) -> u64 {
self.allocated_nodes.saturating_add(self.allocated_edges)
}
#[must_use]
pub const fn live_rows(self) -> u64 {
self.live_nodes.saturating_add(self.live_edges)
}
#[must_use]
pub const fn reclaimable_rows(self) -> u64 {
self.reclaimable_nodes
.saturating_add(self.reclaimable_edges)
}
#[must_use]
pub const fn is_dense(self) -> bool {
self.reclaimable_nodes == 0 && self.reclaimable_edges == 0
}
#[must_use]
pub fn reclaimable_row_basis_points(self) -> u64 {
let allocated = self.allocated_rows();
if allocated == 0 {
return 0;
}
let scaled = u128::from(self.reclaimable_rows()) * u128::from(BASIS_POINTS_DENOMINATOR);
(scaled / u128::from(allocated)) as u64
}
#[must_use]
pub fn compaction_recommended(self) -> bool {
self.reclaimable_rows() >= COMPACTION_RECOMMENDATION_MIN_RECLAIMABLE_ROWS
&& self.reclaimable_row_basis_points()
>= COMPACTION_RECOMMENDATION_MIN_RECLAIMABLE_BASIS_POINTS
}
}
pub struct CompactedCore {
pub graph: SeleneGraph,
pub report: CompactionReport,
}
pub fn compact_core(graph: &SeleneGraph) -> GraphResult<CompactedCore> {
let mut nodes = NodeStore::new();
let mut live_nodes: HashSet<NodeId> =
HashSet::with_capacity(graph.node_store.alive.len() as usize);
for old_row in graph.node_store.alive.iter() {
let r = old_row as usize;
let id = graph
.node_id_for_row(RowIndex::new(old_row))
.ok_or_else(|| GraphError::Inconsistent {
reason: format!("alive node row {old_row} has no external id during compaction"),
})?;
let column_missing = |col: &str| GraphError::Inconsistent {
reason: format!("node {col} column missing live row {old_row} during compaction"),
};
nodes.labels.push(
graph
.node_store
.labels
.get(r)
.cloned()
.ok_or_else(|| column_missing("labels"))?,
);
nodes.properties.push(
graph
.node_store
.properties
.get(r)
.cloned()
.ok_or_else(|| column_missing("properties"))?,
);
nodes.row_to_id.push(id);
live_nodes.insert(id);
}
let node_len = nodes.labels.len() as u32;
for new_row in 0..node_len {
nodes.alive_mut().insert(new_row);
}
let mut edges = EdgeStore::new();
for old_row in graph.edge_store.alive.iter() {
let r = old_row as usize;
let id = graph
.edge_id_for_row(RowIndex::new(old_row))
.ok_or_else(|| GraphError::Inconsistent {
reason: format!("alive edge row {old_row} has no external id during compaction"),
})?;
let column_missing = |col: &str| GraphError::Inconsistent {
reason: format!("edge {col} column missing live row {old_row} during compaction"),
};
let source = graph
.edge_store
.source
.get(r)
.copied()
.ok_or_else(|| column_missing("source"))?;
let target = graph
.edge_store
.target
.get(r)
.copied()
.ok_or_else(|| column_missing("target"))?;
if !live_nodes.contains(&source) || !live_nodes.contains(&target) {
return Err(GraphError::Inconsistent {
reason: format!(
"edge {id} survives compaction but endpoint {source} or {target} does not"
),
});
}
edges.label.push(
graph
.edge_store
.label
.get(r)
.cloned()
.ok_or_else(|| column_missing("label"))?,
);
edges.source.push(source);
edges.target.push(target);
edges.properties.push(
graph
.edge_store
.properties
.get(r)
.cloned()
.ok_or_else(|| column_missing("properties"))?,
);
edges.row_to_id.push(id);
}
let edge_len = edges.label.len() as u32;
for new_row in 0..edge_len {
edges.alive_mut().insert(new_row);
}
let stats = CompactionStats::from_graph(graph);
let report = CompactionReport {
reclaimed_nodes: stats.reclaimable_nodes,
reclaimed_edges: stats.reclaimable_edges,
};
let mut dense = SeleneGraph::new(graph.meta.graph_id);
dense.meta = graph.meta.clone();
dense.node_store = nodes;
dense.edge_store = edges;
for ((label, property), entry) in &graph.property_index {
dense.property_index.insert(
(label.clone(), property.clone()),
PropertyIndexEntry::new(TypedIndex::new(entry.kind()), entry.name.clone()),
);
}
for (key, entry) in &graph.composite_property_index {
dense.composite_property_index.insert(
key.clone(),
CompositePropertyIndexEntry::new(
crate::CompositeTypedIndex::new(entry.kinds()),
entry.declared_properties.clone(),
entry.name.clone(),
),
);
}
for ((label, property), entry) in &graph.vector_index {
dense.vector_index.insert(
(label.clone(), property.clone()),
VectorIndexEntry::new(
crate::VectorIndex::new_with_configs(
entry.kind(),
entry.dimension(),
entry.hnsw_config(),
entry.ivf_config(),
)?,
entry.name.clone(),
),
);
}
for ((label, property), entry) in &graph.text_index {
dense.text_index.insert(
(label.clone(), property.clone()),
crate::graph::TextIndexEntry::new(
crate::TextIndex::empty(label.clone(), property.clone()),
entry.name.clone(),
),
);
}
crate::shared::rebuild_derived_state(&mut dense)?;
crate::property_index::rebuild_property_indexes(&mut dense)?;
crate::composite_property_index::rebuild_composite_property_indexes(&mut dense)?;
crate::vector_index::rebuild_vector_indexes(&mut dense)?;
crate::text_index::rebuild_text_indexes(&mut dense)?;
#[cfg(debug_assertions)]
if let Err(reason) = dense.assert_indexes_consistent() {
return Err(GraphError::Inconsistent {
reason: format!("compacted graph failed consistency check: {reason}"),
});
}
Ok(CompactedCore {
graph: dense,
report,
})
}
#[cfg(test)]
mod schema_tests;
#[cfg(test)]
mod tests;