Skip to main content

selene_graph/
compaction.rs

1//! CORE graph compaction mechanism (BRIEF-Item-4b / 4c).
2//!
3//! [`compact_core`] is a pure transform: given a [`SeleneGraph`], it builds a
4//! fresh graph whose rows are dense (every dead / aborted-tx hole row dropped),
5//! preserving external `NodeId` / `EdgeId` and the monotonic allocator
6//! high-water marks, then rebuilds all derived state from the compacted columns
7//! via the existing recovery-path rebuilders. It performs NO publication and NO
8//! snapshot I/O — the snapshot writer wiring is BRIEF-Item-4c, the
9//! create-time row-allocation change (arith → append) and dropping the
10//! `rebuild_id_maps` identity bootstrap land when a compacted graph first goes
11//! live (also 4c). Database strings are plain owned values, so compaction has no
12//! string-pool reclamation work to perform.
13//!
14//! Because 4a left edges + adjacency keyed by stable external `NodeId`, a row
15//! renumber does not touch edge endpoints or adjacency — only the row-keyed
16//! columns, the alive bitmaps, the row-indexed label/property indexes, and the
17//! id↔row maps move, and all of those are rebuilt from the compacted columns.
18//!
19//! **Cross-epoch WAL replay (BRIEF-Item-4e — RESOLVED, no new mechanism).**
20//! Compaction drops dead rows, so a *deleted* external id that gets compacted
21//! away resolves `NotFound` afterwards (was `NotAlive` under 4a's Option B). The
22//! concern was that a `Change::NodeDeleted` / `EdgeDeleted` written *before* a
23//! compaction, replayed *after* loading the compacted snapshot, would route
24//! through `require_live_*` (`recovery_state`) and hard-error on the reclaimed
25//! id. This cannot happen in the normal flow: a snapshot is published via
26//! `WalWriter::rotate_with_manifest`, which both advances the MANIFEST
27//! `live_snapshot_seq` WAL floor AND physically truncates the WAL (`set_len(0)`
28//! then a fresh header). Pre-compaction entries are therefore *gone* AND below
29//! the recovery floor (`recovery.rs` replays only `header.sequence > floor`) —
30//! they can never be replayed against a compacted snapshot. The only cross-epoch
31//! replay is of *post*-snapshot entries, which resolve against the dense rows:
32//! a post-compaction `NodeCreated` appends (BRIEF-Item-4c), and a post-compaction
33//! `NodeDeleted` of a survivor finds it in the compacted snapshot (proven by
34//! `recover_tests::nodeid_split_recovery`). The `require_live_*` hard-error is
35//! deliberately *retained* as genuine-corruption detection — a "no-op the
36//! reclaimed-id delete" alternative was rejected because it would mask a truly
37//! inconsistent WAL. The MANIFEST `compaction_epoch` field stays reserved (`0`).
38
39use std::collections::HashSet;
40
41use selene_core::NodeId;
42
43use crate::error::{GraphError, GraphResult};
44use crate::graph::{
45    CompositePropertyIndexEntry, PropertyIndexEntry, SeleneGraph, VectorIndexEntry,
46};
47use crate::store::{EdgeStore, NodeStore, RowIndex};
48use crate::typed_index::TypedIndex;
49
50const BASIS_POINTS_DENOMINATOR: u64 = 10_000;
51
52/// Minimum dead node+edge rows before compaction recommendation can fire.
53///
54/// Compaction rebuilds the whole live graph, so tiny amounts of row churn should
55/// remain explicit caller policy rather than default maintenance advice.
56pub const COMPACTION_RECOMMENDATION_MIN_RECLAIMABLE_ROWS: u64 = 1_024;
57
58/// Minimum dead-row ratio, scaled by 10,000, for compaction advice.
59pub const COMPACTION_RECOMMENDATION_MIN_RECLAIMABLE_BASIS_POINTS: u64 = 2_500;
60
61/// What CORE reclaimed during a compaction pass.
62#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
63pub struct CompactionReport {
64    /// Node rows (dead + hole) dropped.
65    pub reclaimed_nodes: u64,
66    /// Edge rows (dead + hole) dropped.
67    pub reclaimed_edges: u64,
68}
69
70/// Cheap snapshot of row-space pressure before compaction.
71///
72/// Deletes clear heavyweight row payloads immediately, but the row slots and
73/// stable id mappings remain until compaction. These counters let maintenance
74/// policy decide whether a dense rebuild is worth scheduling without first
75/// performing that rebuild.
76#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
77pub struct CompactionStats {
78    /// Allocated node rows, including live rows and reclaimable dead rows.
79    pub allocated_nodes: u64,
80    /// Alive node rows.
81    pub live_nodes: u64,
82    /// Dead node rows that a compaction pass can reclaim.
83    pub reclaimable_nodes: u64,
84    /// Allocated edge rows, including live rows and reclaimable dead rows.
85    pub allocated_edges: u64,
86    /// Alive edge rows.
87    pub live_edges: u64,
88    /// Dead edge rows that a compaction pass can reclaim.
89    pub reclaimable_edges: u64,
90}
91
92impl CompactionStats {
93    /// Build compaction counters for `graph` without rebuilding any rows.
94    #[must_use]
95    pub fn from_graph(graph: &SeleneGraph) -> Self {
96        let allocated_nodes = graph.node_store.len() as u64;
97        let live_nodes = graph.node_count() as u64;
98        let allocated_edges = graph.edge_store.len() as u64;
99        let live_edges = graph.edge_count() as u64;
100        Self {
101            allocated_nodes,
102            live_nodes,
103            reclaimable_nodes: allocated_nodes.saturating_sub(live_nodes),
104            allocated_edges,
105            live_edges,
106            reclaimable_edges: allocated_edges.saturating_sub(live_edges),
107        }
108    }
109
110    /// Total allocated node and edge rows.
111    #[must_use]
112    pub const fn allocated_rows(self) -> u64 {
113        self.allocated_nodes.saturating_add(self.allocated_edges)
114    }
115
116    /// Total alive node and edge rows.
117    #[must_use]
118    pub const fn live_rows(self) -> u64 {
119        self.live_nodes.saturating_add(self.live_edges)
120    }
121
122    /// Total reclaimable dead node and edge rows.
123    #[must_use]
124    pub const fn reclaimable_rows(self) -> u64 {
125        self.reclaimable_nodes
126            .saturating_add(self.reclaimable_edges)
127    }
128
129    /// True when no dead rows remain to compact.
130    #[must_use]
131    pub const fn is_dense(self) -> bool {
132        self.reclaimable_nodes == 0 && self.reclaimable_edges == 0
133    }
134
135    /// Return reclaimable rows divided by allocated rows, scaled by 10,000.
136    #[must_use]
137    pub fn reclaimable_row_basis_points(self) -> u64 {
138        let allocated = self.allocated_rows();
139        if allocated == 0 {
140            return 0;
141        }
142        let scaled = u128::from(self.reclaimable_rows()) * u128::from(BASIS_POINTS_DENOMINATOR);
143        (scaled / u128::from(allocated)) as u64
144    }
145
146    /// Return true when current row-space pressure merits maintenance compaction.
147    ///
148    /// The recommendation is diagnostic only: reads and writes never compact
149    /// automatically, and callers still decide when to run maintenance.
150    #[must_use]
151    pub fn compaction_recommended(self) -> bool {
152        self.reclaimable_rows() >= COMPACTION_RECOMMENDATION_MIN_RECLAIMABLE_ROWS
153            && self.reclaimable_row_basis_points()
154                >= COMPACTION_RECOMMENDATION_MIN_RECLAIMABLE_BASIS_POINTS
155    }
156}
157
158/// The product of compacting the CORE store.
159pub struct CompactedCore {
160    /// The dense, fully-rebuilt compacted graph (no dead/hole rows).
161    pub graph: SeleneGraph,
162    /// What CORE reclaimed.
163    pub report: CompactionReport,
164}
165
166/// Compact `graph`: drop every dead / hole row, renumber rows dense (ascending
167/// old-row order), and rebuild all derived state. External ids and the
168/// allocator high-water marks are preserved verbatim.
169///
170/// # Errors
171///
172/// Returns [`GraphError`] if an alive row has no mapped external id (a corrupt
173/// id↔row mapping) or if the rebuilt graph fails its consistency check.
174pub fn compact_core(graph: &SeleneGraph) -> GraphResult<CompactedCore> {
175    let mut nodes = NodeStore::new();
176    let mut live_nodes: HashSet<NodeId> =
177        HashSet::with_capacity(graph.node_store.alive.len() as usize);
178    for old_row in graph.node_store.alive.iter() {
179        let r = old_row as usize;
180        let id = graph
181            .node_id_for_row(RowIndex::new(old_row))
182            .ok_or_else(|| GraphError::Inconsistent {
183                reason: format!("alive node row {old_row} has no external id during compaction"),
184            })?;
185        let column_missing = |col: &str| GraphError::Inconsistent {
186            reason: format!("node {col} column missing live row {old_row} during compaction"),
187        };
188        // Strict (fail-loud) reads, symmetric with the edge columns below: a
189        // misaligned column on an alive row is corruption, not an empty row.
190        nodes.labels.push(
191            graph
192                .node_store
193                .labels
194                .get(r)
195                .cloned()
196                .ok_or_else(|| column_missing("labels"))?,
197        );
198        nodes.properties.push(
199            graph
200                .node_store
201                .properties
202                .get(r)
203                .cloned()
204                .ok_or_else(|| column_missing("properties"))?,
205        );
206        nodes.row_to_id.push(id);
207        live_nodes.insert(id);
208    }
209    let node_len = nodes.labels.len() as u32;
210    // B1: `alive_mut` is free here — the store is freshly built, so its Arc is
211    // unique and `make_mut` never clones.
212    for new_row in 0..node_len {
213        nodes.alive_mut().insert(new_row);
214    }
215
216    let mut edges = EdgeStore::new();
217    for old_row in graph.edge_store.alive.iter() {
218        let r = old_row as usize;
219        let id = graph
220            .edge_id_for_row(RowIndex::new(old_row))
221            .ok_or_else(|| GraphError::Inconsistent {
222                reason: format!("alive edge row {old_row} has no external id during compaction"),
223            })?;
224        let column_missing = |col: &str| GraphError::Inconsistent {
225            reason: format!("edge {col} column missing live row {old_row} during compaction"),
226        };
227        let source = graph
228            .edge_store
229            .source
230            .get(r)
231            .copied()
232            .ok_or_else(|| column_missing("source"))?;
233        let target = graph
234            .edge_store
235            .target
236            .get(r)
237            .copied()
238            .ok_or_else(|| column_missing("target"))?;
239        // Defense-in-depth: a surviving edge must reference surviving nodes. The
240        // mutator's delete_node cascade guarantees this for engine-produced
241        // graphs; reject a dangling edge (only constructible by a raw column
242        // build) rather than rebuild adjacency keyed by a vanished node.
243        if !live_nodes.contains(&source) || !live_nodes.contains(&target) {
244            return Err(GraphError::Inconsistent {
245                reason: format!(
246                    "edge {id} survives compaction but endpoint {source} or {target} does not"
247                ),
248            });
249        }
250        edges.label.push(
251            graph
252                .edge_store
253                .label
254                .get(r)
255                .cloned()
256                .ok_or_else(|| column_missing("label"))?,
257        );
258        edges.source.push(source);
259        edges.target.push(target);
260        edges.properties.push(
261            graph
262                .edge_store
263                .properties
264                .get(r)
265                .cloned()
266                .ok_or_else(|| column_missing("properties"))?,
267        );
268        edges.row_to_id.push(id);
269    }
270    let edge_len = edges.label.len() as u32;
271    // B1: free `make_mut` on a freshly built store (see node loop above).
272    for new_row in 0..edge_len {
273        edges.alive_mut().insert(new_row);
274    }
275
276    let stats = CompactionStats::from_graph(graph);
277    let report = CompactionReport {
278        reclaimed_nodes: stats.reclaimable_nodes,
279        reclaimed_edges: stats.reclaimable_edges,
280    };
281
282    // Assemble the dense graph. meta is preserved VERBATIM — the monotonic
283    // next_node_id/next_edge_id high-water marks must NOT be recomputed from the
284    // shrunken row count, or a compacted-away external id could be reused
285    // (violating D11/D22). bound_type + generation carry through unchanged.
286    let mut dense = SeleneGraph::new(graph.meta.graph_id);
287    dense.meta = graph.meta.clone();
288    dense.node_store = nodes;
289    dense.edge_store = edges;
290
291    // Carry over property-index *registrations* (which (label, property) are
292    // indexed + their kind/name) with fresh empty indexes; the rebuilders below
293    // refill the row bitmaps from the dense columns. Cloning the source entries
294    // directly would copy stale OLD-row bitmaps.
295    for ((label, property), entry) in &graph.property_index {
296        dense.property_index.insert(
297            (label.clone(), property.clone()),
298            PropertyIndexEntry::new(TypedIndex::new(entry.kind()), entry.name.clone()),
299        );
300    }
301    for ((label, property), entry) in &graph.edge_property_index {
302        dense.edge_property_index.insert(
303            (label.clone(), property.clone()),
304            PropertyIndexEntry::new(TypedIndex::new(entry.kind()), entry.name.clone()),
305        );
306    }
307    for (key, entry) in &graph.composite_property_index {
308        dense.composite_property_index.insert(
309            key.clone(),
310            CompositePropertyIndexEntry::new(
311                crate::CompositeTypedIndex::new(entry.kinds()),
312                entry.declared_properties.clone(),
313                entry.name.clone(),
314            ),
315        );
316    }
317    for ((label, property), entry) in &graph.vector_index {
318        dense.vector_index.insert(
319            (label.clone(), property.clone()),
320            VectorIndexEntry::new(
321                crate::VectorIndex::new_with_configs(
322                    entry.kind(),
323                    entry.dimension(),
324                    entry.hnsw_config(),
325                    entry.ivf_config(),
326                )?,
327                entry.name.clone(),
328            ),
329        );
330    }
331    for ((label, property), entry) in &graph.text_index {
332        dense.text_index.insert(
333            (label.clone(), property.clone()),
334            crate::graph::TextIndexEntry::new(
335                crate::TextIndex::empty(label.clone(), property.clone()),
336                entry.name.clone(),
337            ),
338        );
339    }
340
341    // Rebuild every derived structure from the dense columns — the same chain
342    // SharedGraph::from_graph_parts_and_snapshot uses on the recovery path.
343    crate::shared::rebuild_derived_state(&mut dense)?;
344    crate::property_index::rebuild_property_indexes(&mut dense)?;
345    crate::property_index::rebuild_edge_property_indexes(&mut dense)?;
346    crate::composite_property_index::rebuild_composite_property_indexes(&mut dense)?;
347    crate::vector_index::rebuild_vector_indexes(&mut dense)?;
348    crate::text_index::rebuild_text_indexes(&mut dense)?;
349
350    // Debug-only structural net (matches the snapshot-load publication seam):
351    // re-derive every index from the compacted columns and confirm agreement.
352    #[cfg(debug_assertions)]
353    if let Err(reason) = dense.assert_indexes_consistent() {
354        return Err(GraphError::Inconsistent {
355            reason: format!("compacted graph failed consistency check: {reason}"),
356        });
357    }
358
359    Ok(CompactedCore {
360        graph: dense,
361        report,
362    })
363}
364
365#[cfg(test)]
366mod schema_tests;
367#[cfg(test)]
368mod tests;