1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
// SPDX-License-Identifier: BUSL-1.1
//! Top-level compaction orchestration.
//!
//! `run_compaction` drives all five compaction phases (vector, CSR, dangling
//! edges, timeseries L1, FTS LSM) and aggregates the results into a
//! `CompactionStats`. `execute_compact` is the entry point dispatched from
//! the Control Plane via `PhysicalPlan::Compact`.
use tracing::info;
use crate::bridge::envelope::Response;
use crate::data::executor::core_loop::CoreLoop;
use crate::data::executor::task::ExecutionTask;
use nodedb_types::DatabaseId;
use super::budget::BudgetGate;
use super::stats::CompactionStats;
impl CoreLoop {
/// Execute an on-demand compaction request.
///
/// Forces compaction of all vector collections (regardless of tombstone
/// ratio), CSR compaction, and dangling edge sweep. Returns a summary
/// payload with compaction statistics.
pub(in crate::data::executor) fn execute_compact(&mut self, task: &ExecutionTask) -> Response {
let result = self.run_compaction(true);
let payload = match zerompk::to_msgpack_vec(&result) {
Ok(p) => p,
Err(e) => {
tracing::warn!(error = %e, "failed to encode compaction stats");
Vec::new()
}
};
self.response_with_payload(task, payload)
}
/// Run all maintenance/compaction tasks.
///
/// Called periodically from the runtime event loop (idle maintenance)
/// and on-demand via `PhysicalPlan::Compact`.
///
/// When `force` is false (periodic), only compacts collections whose
/// tombstone ratio exceeds the threshold. When `force` is true
/// (on-demand), compacts everything.
///
/// Per-database CPU budget is enforced when a budget tracker is installed.
/// Collections whose owning database has exhausted its per-minute budget
/// are deferred; this only affects periodic maintenance, not forced compaction.
pub fn run_compaction(&mut self, force: bool) -> CompactionStats {
let mut stats = CompactionStats::default();
// 1. Vector compaction: remove tombstoned nodes from HNSW indexes.
// Collect keys first to avoid borrow conflict on `self`.
let vector_keys: Vec<_> = self.vector_collections.keys().cloned().collect();
for key in vector_keys {
let tid = key.0;
let db = self.database_for_tenant(tid);
// Budget gate. The lease, if any, MUST be bound to a `let` so it
// lives across `collection.compact()` below — its `Drop` impl is
// what records actual elapsed CPU into the per-db window.
let _lease = match self.acquire_maintenance_lease(db, force) {
BudgetGate::Granted(lease) => lease,
BudgetGate::Deferred => {
stats.vectors_deferred += 1;
tracing::debug!(
core = self.core_id,
db = db.as_u64(),
collection = &key.1,
"vector compaction deferred: database over maintenance budget"
);
continue;
}
};
let collection = match self.vector_collections.get_mut(&key) {
Some(c) => c,
None => continue,
};
let total_tombstones: usize = collection
.sealed_segments()
.iter()
.map(|seg| seg.index.tombstone_count())
.sum();
let total_nodes: usize = collection
.sealed_segments()
.iter()
.map(|seg| seg.index.len())
.sum();
if total_tombstones == 0 {
continue;
}
let ratio = if total_nodes > 0 {
total_tombstones as f64 / total_nodes as f64
} else {
0.0
};
if !force && ratio < self.compaction_tombstone_threshold {
continue;
}
let removed = collection.compact();
if removed > 0 {
info!(
core = self.core_id,
collection = &key.1,
removed,
ratio = format!("{ratio:.2}"),
"vector compaction: tombstones removed"
);
stats.vectors_compacted += removed;
stats.collections_compacted += 1;
}
// `_lease` drops here, recording elapsed wall-clock into the budget window.
}
// 2. CSR compaction: merge write buffers into dense arrays.
// CSR is not per-database keyed; use DEFAULT as the budget scope.
let csr_db = DatabaseId::DEFAULT;
match self.acquire_maintenance_lease(csr_db, force) {
BudgetGate::Granted(_lease) => {
match self.csr.compact_all() {
Ok(()) => stats.csr_compacted = true,
Err(e) => tracing::warn!(
error = %e,
"CSR compaction rejected by memory governor; skipping"
),
}
// _lease drops here, recording elapsed wall-clock into the budget window.
}
BudgetGate::Deferred => {
stats.csr_deferred = true;
tracing::debug!(
core = self.core_id,
db = csr_db.as_u64(),
"CSR compaction deferred: database over maintenance budget"
);
}
}
// 3. Dangling edge sweep — budget-gated against DEFAULT (sweep is
// process-wide; per-tenant attribution happens inside the sweep loop).
let edges_db = DatabaseId::DEFAULT;
match self.acquire_maintenance_lease(edges_db, force) {
BudgetGate::Granted(_lease) => {
stats.edges_swept = self.sweep_dangling_edges();
// _lease drops here, recording elapsed wall-clock into the budget window.
}
BudgetGate::Deferred => {
stats.edges_deferred = true;
tracing::debug!(
core = self.core_id,
db = edges_db.as_u64(),
"edge sweep deferred: database over maintenance budget"
);
}
}
// 4. L1 segment compaction: per-(tenant, collection) → per-database gated.
let (merged, deferred) = self.run_segment_compaction(force);
stats.segments_merged = merged;
stats.segments_deferred = deferred;
// 5. FTS LSM level compaction: merge L0→L1→… segments per collection.
let fts_outcome = self.run_fts_compaction(force);
stats.fts_compacted = fts_outcome.merged;
stats.fts_deferred = fts_outcome.deferred;
stats.fts_enumeration_failed = fts_outcome.enumeration_failed;
if stats.vectors_compacted > 0
|| stats.edges_swept > 0
|| stats.segments_merged > 0
|| stats.fts_compacted > 0
|| stats.vectors_deferred > 0
|| stats.csr_deferred
|| stats.edges_deferred
|| stats.segments_deferred > 0
|| stats.fts_deferred > 0
|| stats.fts_enumeration_failed
{
info!(
core = self.core_id,
vectors_compacted = stats.vectors_compacted,
collections_compacted = stats.collections_compacted,
edges_swept = stats.edges_swept,
segments_merged = stats.segments_merged,
fts_compacted = stats.fts_compacted,
vectors_deferred = stats.vectors_deferred,
csr_deferred = stats.csr_deferred,
edges_deferred = stats.edges_deferred,
segments_deferred = stats.segments_deferred,
fts_deferred = stats.fts_deferred,
fts_enumeration_failed = stats.fts_enumeration_failed,
"compaction cycle complete"
);
}
stats
}
}