use tracing::info;
use crate::bridge::envelope::Response;
use crate::data::executor::core_loop::CoreLoop;
use crate::data::executor::task::ExecutionTask;
impl CoreLoop {
pub(in crate::data::executor) fn execute_compact(&mut self, task: &ExecutionTask) -> Response {
let result = self.run_compaction(true);
let payload = match rmp_serde::to_vec_named(&result) {
Ok(p) => p,
Err(e) => {
tracing::warn!(error = %e, "failed to encode compaction stats");
Vec::new()
}
};
self.response_with_payload(task, payload)
}
pub fn run_compaction(&mut self, force: bool) -> CompactionStats {
let mut stats = CompactionStats::default();
for (key, collection) in &mut self.vector_collections {
let total_tombstones: usize = collection
.sealed_segments()
.iter()
.map(|seg| seg.index.tombstone_count())
.sum();
let total_nodes: usize = collection
.sealed_segments()
.iter()
.map(|seg| seg.index.len())
.sum();
if total_tombstones == 0 {
continue;
}
let ratio = if total_nodes > 0 {
total_tombstones as f64 / total_nodes as f64
} else {
0.0
};
if !force && ratio < self.compaction_tombstone_threshold {
continue;
}
let removed = collection.compact();
if removed > 0 {
info!(
core = self.core_id,
collection = %key,
removed,
ratio = format!("{ratio:.2}"),
"vector compaction: tombstones removed"
);
stats.vectors_compacted += removed;
stats.collections_compacted += 1;
}
}
self.csr.compact();
stats.csr_compacted = true;
stats.edges_swept = self.sweep_dangling_edges();
stats.segments_merged = self.run_segment_compaction(force);
if stats.vectors_compacted > 0 || stats.edges_swept > 0 || stats.segments_merged > 0 {
info!(
core = self.core_id,
vectors_compacted = stats.vectors_compacted,
collections_compacted = stats.collections_compacted,
edges_swept = stats.edges_swept,
segments_merged = stats.segments_merged,
"compaction cycle complete"
);
}
stats
}
pub fn maybe_run_maintenance(&mut self) -> bool {
let flush_plan = self.checkpoint_coordinator.tick();
for (engine, pages) in &flush_plan {
match engine.as_str() {
"vector" => {
let flushed = self.checkpoint_vector_indexes();
self.checkpoint_coordinator
.record_flush("vector", flushed.min(*pages));
}
"crdt" => {
let flushed = self.checkpoint_crdt_engines();
self.checkpoint_coordinator
.record_flush("crdt", flushed.min(*pages));
}
"sparse" => {
self.checkpoint_coordinator.record_flush("sparse", *pages);
}
"timeseries" => {
self.checkpoint_coordinator
.record_flush("timeseries", *pages);
}
_ => {}
}
}
if self.checkpoint_coordinator.is_clean()
&& !flush_plan.is_empty()
&& self.checkpoint_coordinator.total_dirty_pages() == 0
{
self.checkpoint_coordinator
.complete_checkpoint(self.watermark.as_u64());
}
{
let now_ms = crate::engine::kv::current_ms();
let expired_keys = self.kv_engine.tick_expiry(now_ms);
if !expired_keys.is_empty() {
tracing::debug!(
core = self.core_id,
reaped = expired_keys.len(),
backlog = self.kv_engine.expiry_backlog(),
"kv expiry wheel tick"
);
for ek in &expired_keys {
tracing::info!(
target: "nodedb::kv::expired",
tenant_id = ek.tenant_id,
collection = %ek.collection,
key_len = ek.key.len(),
"kv key expired"
);
}
}
}
let now = std::time::Instant::now();
if let Some(last) = self.last_maintenance
&& now.duration_since(last) < self.compaction_interval
{
return !flush_plan.is_empty();
}
self.last_maintenance = Some(now);
self.run_compaction(false);
true
}
}
impl CoreLoop {
fn run_segment_compaction(&mut self, force: bool) -> usize {
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as i64)
.unwrap_or(0);
let max_per_pass = self.segment_compaction_config.max_segments_per_pass;
let mut total_merged = 0;
for (collection, registry) in &mut self.ts_registries {
let mut groups = registry.find_mergeable(now_ms);
if !force {
groups.truncate(max_per_pass);
}
for group in &groups {
for &start_ts in group {
registry.mark_merging(start_ts);
}
total_merged += group.len();
}
let expired = registry.find_expired(now_ms);
for start_ts in &expired {
registry.mark_deleted(*start_ts);
}
let purged = registry.purge_deleted();
if !groups.is_empty() || !purged.is_empty() {
info!(
core = self.core_id,
collection = %collection,
merge_groups = groups.len(),
expired = expired.len(),
purged = purged.len(),
"timeseries partition compaction"
);
}
if force && total_merged == 0 {
let sealed_count = registry.sealed_count();
if sealed_count >= 2 {
info!(
core = self.core_id,
collection = %collection,
sealed_count,
"forced compaction: marking sealed partitions for merge"
);
}
}
}
total_merged
}
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct CompactionStats {
pub vectors_compacted: usize,
pub collections_compacted: usize,
pub csr_compacted: bool,
pub edges_swept: usize,
pub segments_merged: usize,
}
#[cfg(test)]
mod tests {
use crate::engine::vector::hnsw::graph::HnswParams;
#[test]
fn compaction_removes_tombstones() {
let mut idx = crate::engine::vector::hnsw::graph::HnswIndex::new(4, HnswParams::default());
for i in 0..20u32 {
idx.insert(vec![i as f32; 4]);
}
for i in 0..10u32 {
idx.delete(i);
}
assert_eq!(idx.tombstone_count(), 10);
assert_eq!(idx.live_count(), 10);
let removed = idx.compact();
assert_eq!(removed, 10);
assert_eq!(idx.live_count(), 10);
assert_eq!(idx.tombstone_count(), 0);
}
#[test]
fn maintenance_respects_interval() {
let dir = tempfile::tempdir().unwrap();
let (mut core, _req_tx, _resp_rx) =
crate::data::executor::core_loop::tests::make_core_with_dir(dir.path());
assert!(core.maybe_run_maintenance());
assert!(!core.maybe_run_maintenance());
}
#[test]
fn forced_compaction_ignores_threshold() {
let dir = tempfile::tempdir().unwrap();
let (mut core, _req_tx, _resp_rx) =
crate::data::executor::core_loop::tests::make_core_with_dir(dir.path());
let stats = core.run_compaction(true);
assert_eq!(stats.vectors_compacted, 0);
assert!(stats.csr_compacted);
}
}