Skip to main content

sqry_db/
compaction.rs

1//! Async background compaction for the graph arena.
2//!
3//! Compaction reclaims tombstoned node slots and rebuilds the CSR edge store
4//! from the delta buffer, reducing fragmentation and restoring O(1) edge
5//! lookup performance.
6//!
7//! # Trigger thresholds
8//!
9//! Compaction runs when either:
10//! - Arena fragmentation exceeds 20%: `free_slots / total_slots > 0.20`
11//! - Delta buffer ratio exceeds 10%: `delta_buffer.len() / csr.edge_count() > 0.10`
12//!
13//! # Concurrency
14//!
15//! Compaction runs on a background thread. Queries continue against the old
16//! CSR + delta buffer during compaction. The new CSR is swapped in atomically
17//! via `Arc` swap when ready.
18
19use std::sync::Arc;
20use std::sync::atomic::{AtomicBool, Ordering};
21use std::thread;
22
23use crate::config::QueryDbConfig;
24
25/// Handle to a background compaction thread.
26///
27/// Dropping the handle signals the thread to stop (via the `stop` flag).
28/// The thread checks this flag between compaction cycles.
29pub struct CompactionHandle {
30    stop: Arc<AtomicBool>,
31    thread: Option<thread::JoinHandle<CompactionResult>>,
32}
33
34/// Result of a compaction cycle.
35#[derive(Debug, Clone, Default)]
36pub struct CompactionResult {
37    /// Number of nodes reclaimed from tombstoned slots.
38    pub nodes_reclaimed: usize,
39    /// Number of edges merged from delta buffer into CSR.
40    pub edges_merged: usize,
41    /// Arena fragmentation before compaction (0.0–1.0).
42    pub fragmentation_before: f64,
43    /// Arena fragmentation after compaction (should be 0.0).
44    pub fragmentation_after: f64,
45    /// Whether compaction was triggered (vs. skipped because thresholds not met).
46    pub triggered: bool,
47}
48
49/// Checks whether compaction thresholds are exceeded.
50///
51/// # Arguments
52///
53/// * `free_slots` — Number of vacant arena slots (tombstoned by remove).
54/// * `total_slots` — Total arena slot count (occupied + vacant).
55/// * `delta_count` — Number of entries in the edge delta buffer.
56/// * `csr_count` — Number of edges in the CSR.
57/// * `config` — Configuration with threshold values.
58#[must_use]
59pub fn should_compact(
60    free_slots: usize,
61    total_slots: usize,
62    delta_count: usize,
63    csr_count: usize,
64    config: &QueryDbConfig,
65) -> bool {
66    if total_slots == 0 {
67        return false;
68    }
69
70    let fragmentation = free_slots as f64 / total_slots as f64;
71    if fragmentation > config.compaction_fragmentation_threshold {
72        return true;
73    }
74
75    if csr_count > 0 {
76        let delta_ratio = delta_count as f64 / csr_count as f64;
77        if delta_ratio > config.compaction_delta_ratio_threshold {
78            return true;
79        }
80    }
81
82    false
83}
84
85impl CompactionHandle {
86    /// Creates a new compaction handle (not yet started).
87    #[must_use]
88    pub fn new() -> Self {
89        Self {
90            stop: Arc::new(AtomicBool::new(false)),
91            thread: None,
92        }
93    }
94
95    /// Signals the compaction thread to stop.
96    pub fn stop(&self) {
97        self.stop.store(true, Ordering::Release);
98    }
99
100    /// Returns true if a compaction thread is running.
101    #[must_use]
102    pub fn is_running(&self) -> bool {
103        self.thread.as_ref().is_some_and(|t| !t.is_finished())
104    }
105
106    /// Waits for the compaction thread to finish and returns the result.
107    ///
108    /// Returns `None` if no thread was started or it has already been joined.
109    pub fn join(mut self) -> Option<CompactionResult> {
110        self.thread.take().and_then(|t| t.join().ok())
111    }
112}
113
114impl Default for CompactionHandle {
115    fn default() -> Self {
116        Self::new()
117    }
118}
119
120impl Drop for CompactionHandle {
121    fn drop(&mut self) {
122        self.stop.store(true, Ordering::Release);
123        // Don't block on join in Drop — the thread will notice the stop flag
124        // and exit on its own.
125    }
126}
127
128#[cfg(test)]
129mod tests {
130    use super::*;
131
132    #[test]
133    fn should_compact_fragmentation() {
134        let config = QueryDbConfig::default();
135        // 20% free → triggers
136        assert!(should_compact(21, 100, 0, 1000, &config));
137        // 19% free → does not trigger
138        assert!(!should_compact(19, 100, 0, 1000, &config));
139    }
140
141    #[test]
142    fn should_compact_delta_ratio() {
143        let config = QueryDbConfig::default();
144        // 11% delta → triggers
145        assert!(should_compact(0, 100, 110, 1000, &config));
146        // 9% delta → does not trigger
147        assert!(!should_compact(0, 100, 90, 1000, &config));
148    }
149
150    #[test]
151    fn should_compact_empty_graph() {
152        let config = QueryDbConfig::default();
153        assert!(!should_compact(0, 0, 0, 0, &config));
154    }
155
156    #[test]
157    fn compaction_handle_lifecycle() {
158        let handle = CompactionHandle::new();
159        assert!(!handle.is_running());
160        handle.stop();
161        let result = handle.join();
162        assert!(result.is_none());
163    }
164}