sqry_db/compaction.rs
1//! Async background compaction for the graph arena.
2//!
3//! Compaction reclaims tombstoned node slots and rebuilds the CSR edge store
4//! from the delta buffer, reducing fragmentation and restoring O(1) edge
5//! lookup performance.
6//!
7//! # Trigger thresholds
8//!
9//! Compaction runs when either:
10//! - Arena fragmentation exceeds 20%: `free_slots / total_slots > 0.20`
11//! - Delta buffer ratio exceeds 10%: `delta_buffer.len() / csr.edge_count() > 0.10`
12//!
13//! # Concurrency
14//!
15//! Compaction runs on a background thread. Queries continue against the old
16//! CSR + delta buffer during compaction. The new CSR is swapped in atomically
17//! via `Arc` swap when ready.
18
19use std::sync::Arc;
20use std::sync::atomic::{AtomicBool, Ordering};
21use std::thread;
22
23use crate::config::QueryDbConfig;
24
25/// Handle to a background compaction thread.
26///
27/// Dropping the handle signals the thread to stop (via the `stop` flag).
28/// The thread checks this flag between compaction cycles.
29pub struct CompactionHandle {
30 stop: Arc<AtomicBool>,
31 thread: Option<thread::JoinHandle<CompactionResult>>,
32}
33
34/// Result of a compaction cycle.
35#[derive(Debug, Clone, Default)]
36pub struct CompactionResult {
37 /// Number of nodes reclaimed from tombstoned slots.
38 pub nodes_reclaimed: usize,
39 /// Number of edges merged from delta buffer into CSR.
40 pub edges_merged: usize,
41 /// Arena fragmentation before compaction (0.0–1.0).
42 pub fragmentation_before: f64,
43 /// Arena fragmentation after compaction (should be 0.0).
44 pub fragmentation_after: f64,
45 /// Whether compaction was triggered (vs. skipped because thresholds not met).
46 pub triggered: bool,
47}
48
49/// Checks whether compaction thresholds are exceeded.
50///
51/// # Arguments
52///
53/// * `free_slots` — Number of vacant arena slots (tombstoned by remove).
54/// * `total_slots` — Total arena slot count (occupied + vacant).
55/// * `delta_count` — Number of entries in the edge delta buffer.
56/// * `csr_count` — Number of edges in the CSR.
57/// * `config` — Configuration with threshold values.
58#[must_use]
59pub fn should_compact(
60 free_slots: usize,
61 total_slots: usize,
62 delta_count: usize,
63 csr_count: usize,
64 config: &QueryDbConfig,
65) -> bool {
66 if total_slots == 0 {
67 return false;
68 }
69
70 let fragmentation = free_slots as f64 / total_slots as f64;
71 if fragmentation > config.compaction_fragmentation_threshold {
72 return true;
73 }
74
75 if csr_count > 0 {
76 let delta_ratio = delta_count as f64 / csr_count as f64;
77 if delta_ratio > config.compaction_delta_ratio_threshold {
78 return true;
79 }
80 }
81
82 false
83}
84
85impl CompactionHandle {
86 /// Creates a new compaction handle (not yet started).
87 #[must_use]
88 pub fn new() -> Self {
89 Self {
90 stop: Arc::new(AtomicBool::new(false)),
91 thread: None,
92 }
93 }
94
95 /// Signals the compaction thread to stop.
96 pub fn stop(&self) {
97 self.stop.store(true, Ordering::Release);
98 }
99
100 /// Returns true if a compaction thread is running.
101 #[must_use]
102 pub fn is_running(&self) -> bool {
103 self.thread.as_ref().is_some_and(|t| !t.is_finished())
104 }
105
106 /// Waits for the compaction thread to finish and returns the result.
107 ///
108 /// Returns `None` if no thread was started or it has already been joined.
109 pub fn join(mut self) -> Option<CompactionResult> {
110 self.thread.take().and_then(|t| t.join().ok())
111 }
112}
113
114impl Default for CompactionHandle {
115 fn default() -> Self {
116 Self::new()
117 }
118}
119
120impl Drop for CompactionHandle {
121 fn drop(&mut self) {
122 self.stop.store(true, Ordering::Release);
123 // Don't block on join in Drop — the thread will notice the stop flag
124 // and exit on its own.
125 }
126}
127
128#[cfg(test)]
129mod tests {
130 use super::*;
131
132 #[test]
133 fn should_compact_fragmentation() {
134 let config = QueryDbConfig::default();
135 // 20% free → triggers
136 assert!(should_compact(21, 100, 0, 1000, &config));
137 // 19% free → does not trigger
138 assert!(!should_compact(19, 100, 0, 1000, &config));
139 }
140
141 #[test]
142 fn should_compact_delta_ratio() {
143 let config = QueryDbConfig::default();
144 // 11% delta → triggers
145 assert!(should_compact(0, 100, 110, 1000, &config));
146 // 9% delta → does not trigger
147 assert!(!should_compact(0, 100, 90, 1000, &config));
148 }
149
150 #[test]
151 fn should_compact_empty_graph() {
152 let config = QueryDbConfig::default();
153 assert!(!should_compact(0, 0, 0, 0, &config));
154 }
155
156 #[test]
157 fn compaction_handle_lifecycle() {
158 let handle = CompactionHandle::new();
159 assert!(!handle.is_running());
160 handle.stop();
161 let result = handle.join();
162 assert!(result.is_none());
163 }
164}