selene_graph/compaction.rs
1//! CORE graph compaction mechanism (BRIEF-Item-4b / 4c).
2//!
3//! [`compact_core`] is a pure transform: given a [`SeleneGraph`], it builds a
4//! fresh graph whose rows are dense (every dead / aborted-tx hole row dropped),
5//! preserving external `NodeId` / `EdgeId` and the monotonic allocator
6//! high-water marks, then rebuilds all derived state from the compacted columns
7//! via the existing recovery-path rebuilders. It performs NO publication and NO
8//! snapshot I/O — the snapshot writer wiring is BRIEF-Item-4c, the
9//! create-time row-allocation change (arith → append) and dropping the
10//! `rebuild_id_maps` identity bootstrap land when a compacted graph first goes
11//! live (also 4c). Database strings are plain owned values, so compaction has no
12//! string-pool reclamation work to perform.
13//!
14//! Because 4a left edges + adjacency keyed by stable external `NodeId`, a row
15//! renumber does not touch edge endpoints or adjacency — only the row-keyed
16//! columns, the alive bitmaps, the row-indexed label/property indexes, and the
17//! id↔row maps move, and all of those are rebuilt from the compacted columns.
18//!
19//! **Cross-epoch WAL replay (BRIEF-Item-4e — RESOLVED, no new mechanism).**
20//! Compaction drops dead rows, so a *deleted* external id that gets compacted
21//! away resolves `NotFound` afterwards (was `NotAlive` under 4a's Option B). The
22//! concern was that a `Change::NodeDeleted` / `EdgeDeleted` written *before* a
23//! compaction, replayed *after* loading the compacted snapshot, would route
24//! through `require_live_*` (`recovery_state`) and hard-error on the reclaimed
25//! id. This cannot happen in the normal flow: a snapshot is published via
26//! `WalWriter::rotate_with_manifest`, which both advances the MANIFEST
27//! `live_snapshot_seq` WAL floor AND physically truncates the WAL (`set_len(0)`
28//! then a fresh header). Pre-compaction entries are therefore *gone* AND below
29//! the recovery floor (`recovery.rs` replays only `header.sequence > floor`) —
30//! they can never be replayed against a compacted snapshot. The only cross-epoch
31//! replay is of *post*-snapshot entries, which resolve against the dense rows:
32//! a post-compaction `NodeCreated` appends (BRIEF-Item-4c), and a post-compaction
33//! `NodeDeleted` of a survivor finds it in the compacted snapshot (proven by
34//! `recover_tests::nodeid_split_recovery`). The `require_live_*` hard-error is
35//! deliberately *retained* as genuine-corruption detection — a "no-op the
36//! reclaimed-id delete" alternative was rejected because it would mask a truly
37//! inconsistent WAL. The MANIFEST `compaction_epoch` field stays reserved (`0`).
38
39use std::collections::HashSet;
40
41use selene_core::NodeId;
42
43use crate::error::{GraphError, GraphResult};
44use crate::graph::{
45 CompositePropertyIndexEntry, PropertyIndexEntry, SeleneGraph, VectorIndexEntry,
46};
47use crate::store::{EdgeStore, NodeStore, RowIndex};
48use crate::typed_index::TypedIndex;
49
50const BASIS_POINTS_DENOMINATOR: u64 = 10_000;
51
52/// Minimum dead node+edge rows before compaction recommendation can fire.
53///
54/// Compaction rebuilds the whole live graph, so tiny amounts of row churn should
55/// remain explicit caller policy rather than default maintenance advice.
56pub const COMPACTION_RECOMMENDATION_MIN_RECLAIMABLE_ROWS: u64 = 1_024;
57
58/// Minimum dead-row ratio, scaled by 10,000, for compaction advice.
59pub const COMPACTION_RECOMMENDATION_MIN_RECLAIMABLE_BASIS_POINTS: u64 = 2_500;
60
61/// What CORE reclaimed during a compaction pass.
62#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
63pub struct CompactionReport {
64 /// Node rows (dead + hole) dropped.
65 pub reclaimed_nodes: u64,
66 /// Edge rows (dead + hole) dropped.
67 pub reclaimed_edges: u64,
68}
69
70/// Cheap snapshot of row-space pressure before compaction.
71///
72/// Deletes clear heavyweight row payloads immediately, but the row slots and
73/// stable id mappings remain until compaction. These counters let maintenance
74/// policy decide whether a dense rebuild is worth scheduling without first
75/// performing that rebuild.
76#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
77pub struct CompactionStats {
78 /// Allocated node rows, including live rows and reclaimable dead rows.
79 pub allocated_nodes: u64,
80 /// Alive node rows.
81 pub live_nodes: u64,
82 /// Dead node rows that a compaction pass can reclaim.
83 pub reclaimable_nodes: u64,
84 /// Allocated edge rows, including live rows and reclaimable dead rows.
85 pub allocated_edges: u64,
86 /// Alive edge rows.
87 pub live_edges: u64,
88 /// Dead edge rows that a compaction pass can reclaim.
89 pub reclaimable_edges: u64,
90}
91
92impl CompactionStats {
93 /// Build compaction counters for `graph` without rebuilding any rows.
94 #[must_use]
95 pub fn from_graph(graph: &SeleneGraph) -> Self {
96 let allocated_nodes = graph.node_store.len() as u64;
97 let live_nodes = graph.node_count() as u64;
98 let allocated_edges = graph.edge_store.len() as u64;
99 let live_edges = graph.edge_count() as u64;
100 Self {
101 allocated_nodes,
102 live_nodes,
103 reclaimable_nodes: allocated_nodes.saturating_sub(live_nodes),
104 allocated_edges,
105 live_edges,
106 reclaimable_edges: allocated_edges.saturating_sub(live_edges),
107 }
108 }
109
110 /// Total allocated node and edge rows.
111 #[must_use]
112 pub const fn allocated_rows(self) -> u64 {
113 self.allocated_nodes.saturating_add(self.allocated_edges)
114 }
115
116 /// Total alive node and edge rows.
117 #[must_use]
118 pub const fn live_rows(self) -> u64 {
119 self.live_nodes.saturating_add(self.live_edges)
120 }
121
122 /// Total reclaimable dead node and edge rows.
123 #[must_use]
124 pub const fn reclaimable_rows(self) -> u64 {
125 self.reclaimable_nodes
126 .saturating_add(self.reclaimable_edges)
127 }
128
129 /// True when no dead rows remain to compact.
130 #[must_use]
131 pub const fn is_dense(self) -> bool {
132 self.reclaimable_nodes == 0 && self.reclaimable_edges == 0
133 }
134
135 /// Return reclaimable rows divided by allocated rows, scaled by 10,000.
136 #[must_use]
137 pub fn reclaimable_row_basis_points(self) -> u64 {
138 let allocated = self.allocated_rows();
139 if allocated == 0 {
140 return 0;
141 }
142 let scaled = u128::from(self.reclaimable_rows()) * u128::from(BASIS_POINTS_DENOMINATOR);
143 (scaled / u128::from(allocated)) as u64
144 }
145
146 /// Return true when current row-space pressure merits maintenance compaction.
147 ///
148 /// The recommendation is diagnostic only: reads and writes never compact
149 /// automatically, and callers still decide when to run maintenance.
150 #[must_use]
151 pub fn compaction_recommended(self) -> bool {
152 self.reclaimable_rows() >= COMPACTION_RECOMMENDATION_MIN_RECLAIMABLE_ROWS
153 && self.reclaimable_row_basis_points()
154 >= COMPACTION_RECOMMENDATION_MIN_RECLAIMABLE_BASIS_POINTS
155 }
156}
157
158/// The product of compacting the CORE store.
159pub struct CompactedCore {
160 /// The dense, fully-rebuilt compacted graph (no dead/hole rows).
161 pub graph: SeleneGraph,
162 /// What CORE reclaimed.
163 pub report: CompactionReport,
164}
165
166/// Compact `graph`: drop every dead / hole row, renumber rows dense (ascending
167/// old-row order), and rebuild all derived state. External ids and the
168/// allocator high-water marks are preserved verbatim.
169///
170/// # Errors
171///
172/// Returns [`GraphError`] if an alive row has no mapped external id (a corrupt
173/// id↔row mapping) or if the rebuilt graph fails its consistency check.
174pub fn compact_core(graph: &SeleneGraph) -> GraphResult<CompactedCore> {
175 let mut nodes = NodeStore::new();
176 let mut live_nodes: HashSet<NodeId> =
177 HashSet::with_capacity(graph.node_store.alive.len() as usize);
178 for old_row in graph.node_store.alive.iter() {
179 let r = old_row as usize;
180 let id = graph
181 .node_id_for_row(RowIndex::new(old_row))
182 .ok_or_else(|| GraphError::Inconsistent {
183 reason: format!("alive node row {old_row} has no external id during compaction"),
184 })?;
185 let column_missing = |col: &str| GraphError::Inconsistent {
186 reason: format!("node {col} column missing live row {old_row} during compaction"),
187 };
188 // Strict (fail-loud) reads, symmetric with the edge columns below: a
189 // misaligned column on an alive row is corruption, not an empty row.
190 nodes.labels.push(
191 graph
192 .node_store
193 .labels
194 .get(r)
195 .cloned()
196 .ok_or_else(|| column_missing("labels"))?,
197 );
198 nodes.properties.push(
199 graph
200 .node_store
201 .properties
202 .get(r)
203 .cloned()
204 .ok_or_else(|| column_missing("properties"))?,
205 );
206 nodes.row_to_id.push(id);
207 live_nodes.insert(id);
208 }
209 let node_len = nodes.labels.len() as u32;
210 // B1: `alive_mut` is free here — the store is freshly built, so its Arc is
211 // unique and `make_mut` never clones.
212 for new_row in 0..node_len {
213 nodes.alive_mut().insert(new_row);
214 }
215
216 let mut edges = EdgeStore::new();
217 for old_row in graph.edge_store.alive.iter() {
218 let r = old_row as usize;
219 let id = graph
220 .edge_id_for_row(RowIndex::new(old_row))
221 .ok_or_else(|| GraphError::Inconsistent {
222 reason: format!("alive edge row {old_row} has no external id during compaction"),
223 })?;
224 let column_missing = |col: &str| GraphError::Inconsistent {
225 reason: format!("edge {col} column missing live row {old_row} during compaction"),
226 };
227 let source = graph
228 .edge_store
229 .source
230 .get(r)
231 .copied()
232 .ok_or_else(|| column_missing("source"))?;
233 let target = graph
234 .edge_store
235 .target
236 .get(r)
237 .copied()
238 .ok_or_else(|| column_missing("target"))?;
239 // Defense-in-depth: a surviving edge must reference surviving nodes. The
240 // mutator's delete_node cascade guarantees this for engine-produced
241 // graphs; reject a dangling edge (only constructible by a raw column
242 // build) rather than rebuild adjacency keyed by a vanished node.
243 if !live_nodes.contains(&source) || !live_nodes.contains(&target) {
244 return Err(GraphError::Inconsistent {
245 reason: format!(
246 "edge {id} survives compaction but endpoint {source} or {target} does not"
247 ),
248 });
249 }
250 edges.label.push(
251 graph
252 .edge_store
253 .label
254 .get(r)
255 .cloned()
256 .ok_or_else(|| column_missing("label"))?,
257 );
258 edges.source.push(source);
259 edges.target.push(target);
260 edges.properties.push(
261 graph
262 .edge_store
263 .properties
264 .get(r)
265 .cloned()
266 .ok_or_else(|| column_missing("properties"))?,
267 );
268 edges.row_to_id.push(id);
269 }
270 let edge_len = edges.label.len() as u32;
271 // B1: free `make_mut` on a freshly built store (see node loop above).
272 for new_row in 0..edge_len {
273 edges.alive_mut().insert(new_row);
274 }
275
276 let stats = CompactionStats::from_graph(graph);
277 let report = CompactionReport {
278 reclaimed_nodes: stats.reclaimable_nodes,
279 reclaimed_edges: stats.reclaimable_edges,
280 };
281
282 // Assemble the dense graph. meta is preserved VERBATIM — the monotonic
283 // next_node_id/next_edge_id high-water marks must NOT be recomputed from the
284 // shrunken row count, or a compacted-away external id could be reused
285 // (violating D11/D22). bound_type + generation carry through unchanged.
286 let mut dense = SeleneGraph::new(graph.meta.graph_id);
287 dense.meta = graph.meta.clone();
288 dense.node_store = nodes;
289 dense.edge_store = edges;
290
291 // Carry over property-index *registrations* (which (label, property) are
292 // indexed + their kind/name) with fresh empty indexes; the rebuilders below
293 // refill the row bitmaps from the dense columns. Cloning the source entries
294 // directly would copy stale OLD-row bitmaps.
295 for ((label, property), entry) in &graph.property_index {
296 dense.property_index.insert(
297 (label.clone(), property.clone()),
298 PropertyIndexEntry::new(TypedIndex::new(entry.kind()), entry.name.clone()),
299 );
300 }
301 for ((label, property), entry) in &graph.edge_property_index {
302 dense.edge_property_index.insert(
303 (label.clone(), property.clone()),
304 PropertyIndexEntry::new(TypedIndex::new(entry.kind()), entry.name.clone()),
305 );
306 }
307 for (key, entry) in &graph.composite_property_index {
308 dense.composite_property_index.insert(
309 key.clone(),
310 CompositePropertyIndexEntry::new(
311 crate::CompositeTypedIndex::new(entry.kinds()),
312 entry.declared_properties.clone(),
313 entry.name.clone(),
314 ),
315 );
316 }
317 for ((label, property), entry) in &graph.vector_index {
318 dense.vector_index.insert(
319 (label.clone(), property.clone()),
320 VectorIndexEntry::new(
321 crate::VectorIndex::new_with_configs(
322 entry.kind(),
323 entry.dimension(),
324 entry.hnsw_config(),
325 entry.ivf_config(),
326 )?,
327 entry.name.clone(),
328 ),
329 );
330 }
331 for ((label, property), entry) in &graph.text_index {
332 dense.text_index.insert(
333 (label.clone(), property.clone()),
334 crate::graph::TextIndexEntry::new(
335 crate::TextIndex::empty(label.clone(), property.clone()),
336 entry.name.clone(),
337 ),
338 );
339 }
340
341 // Rebuild every derived structure from the dense columns — the same chain
342 // SharedGraph::from_graph_parts_and_snapshot uses on the recovery path.
343 crate::shared::rebuild_derived_state(&mut dense)?;
344 crate::property_index::rebuild_property_indexes(&mut dense)?;
345 crate::property_index::rebuild_edge_property_indexes(&mut dense)?;
346 crate::composite_property_index::rebuild_composite_property_indexes(&mut dense)?;
347 crate::vector_index::rebuild_vector_indexes(&mut dense)?;
348 crate::text_index::rebuild_text_indexes(&mut dense)?;
349
350 // Debug-only structural net (matches the snapshot-load publication seam):
351 // re-derive every index from the compacted columns and confirm agreement.
352 #[cfg(debug_assertions)]
353 if let Err(reason) = dense.assert_indexes_consistent() {
354 return Err(GraphError::Inconsistent {
355 reason: format!("compacted graph failed consistency check: {reason}"),
356 });
357 }
358
359 Ok(CompactedCore {
360 graph: dense,
361 report,
362 })
363}
364
365#[cfg(test)]
366mod schema_tests;
367#[cfg(test)]
368mod tests;