use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use nodedb_fts::backend::FtsBackend as _;
use nodedb_fts::lsm::compaction::{
CompactError, CompactionConfig, SegmentMeta, compact_level, needs_compaction, parse_level,
segment_id,
};
use nodedb_mem::MemoryGovernor;
use tracing::info;
use crate::data::executor::core_loop::CoreLoop;
use nodedb_types::TenantId;
use super::budget::BudgetGate;
pub(super) struct FtsCompactionOutcome {
pub merged: u64,
pub deferred: u64,
pub enumeration_failed: bool,
}
struct FtsCompactParams<'a> {
backend: &'a crate::engine::sparse::fts_redb::backend::RedbFtsBackend,
tid: u64,
collection: &'a str,
segments: &'a [SegmentMeta],
level: u32,
governor: Option<&'a Arc<MemoryGovernor>>,
}
impl CoreLoop {
pub(super) fn run_fts_compaction(&mut self, force: bool) -> FtsCompactionOutcome {
let config = CompactionConfig::default();
let mut outcome = FtsCompactionOutcome {
merged: 0,
deferred: 0,
enumeration_failed: false,
};
let collections = match self.inverted.list_all_fts_collections() {
Ok(c) => c,
Err(e) => {
tracing::warn!(
core = self.core_id,
error = %e,
"FTS compaction: failed to enumerate collections"
);
outcome.enumeration_failed = true;
return outcome;
}
};
for (tid, collection) in collections {
self.compact_one_fts_collection(tid, &collection, force, &config, &mut outcome);
}
outcome
}
fn compact_one_fts_collection(
&mut self,
tid: TenantId,
collection: &str,
force: bool,
config: &CompactionConfig,
outcome: &mut FtsCompactionOutcome,
) {
let db = self.database_for_tenant(tid);
let tid_u64 = tid.as_u64();
let segment_ids = match self.inverted.backend().list_segments(tid_u64, collection) {
Ok(ids) => ids,
Err(e) => {
tracing::warn!(
core = self.core_id,
tid = tid_u64,
collection = %collection,
error = %e,
"FTS compaction: failed to list segments — deferred to next cycle"
);
outcome.deferred += 1;
return;
}
};
if segment_ids.is_empty() {
return;
}
let segments: Vec<SegmentMeta> = segment_ids
.iter()
.map(|id| SegmentMeta {
segment_id: id.clone(),
level: parse_level(id),
size: 0,
})
.collect();
let level = match needs_compaction(&segments, config) {
Some(l) => l,
None => return,
};
let _lease = match self.acquire_maintenance_lease(db, force) {
BudgetGate::Granted(lease) => lease,
BudgetGate::Deferred => {
outcome.deferred += 1;
tracing::debug!(
core = self.core_id,
db = db.as_u64(),
collection = %collection,
"FTS compaction deferred: database over maintenance budget"
);
return;
}
};
let governor = self.governor.as_ref();
let params = FtsCompactParams {
backend: self.inverted.backend(),
tid: tid_u64,
collection,
segments: &segments,
level,
governor,
};
match run_compact_level(params) {
Ok(Some((new_bytes, merged_ids))) => {
let new_level = level + 1;
let new_id_num = fts_new_segment_id();
let new_seg_id = segment_id(new_id_num, new_level);
let merged_count = merged_ids.len() as u64;
match self.inverted.compact_commit(
tid,
collection,
&new_seg_id,
&new_bytes,
&merged_ids,
) {
Ok(()) => {
outcome.merged += merged_count;
info!(
core = self.core_id,
tid = tid_u64,
collection = %collection,
level,
new_level,
merged = merged_count,
"FTS LSM level compaction committed"
);
}
Err(e) => {
outcome.deferred += 1;
tracing::warn!(
core = self.core_id,
tid = tid_u64,
collection = %collection,
level,
error = %e,
"FTS compaction: commit failed, original segments preserved — deferred"
);
}
}
}
Ok(None) => {
}
Err(CompactError::Budget(e)) => {
outcome.deferred += 1;
tracing::debug!(
core = self.core_id,
tid = tid_u64,
collection = %collection,
error = %e,
"FTS compaction: memory budget exhausted, deferred"
);
}
Err(CompactError::Backend(e)) => {
outcome.deferred += 1;
tracing::warn!(
core = self.core_id,
tid = tid_u64,
collection = %collection,
level,
error = %e,
"FTS compaction: segment read failed, deferred"
);
}
}
}
}
fn run_compact_level(
p: FtsCompactParams<'_>,
) -> Result<Option<nodedb_fts::lsm::compaction::CompactionResult>, CompactError<crate::Error>> {
compact_level(
p.backend,
p.tid,
p.collection,
p.segments,
p.level,
p.governor,
)
}
fn fts_new_segment_id() -> u64 {
static COUNTER: AtomicU64 = AtomicU64::new(0);
let n = COUNTER.fetch_add(1, Ordering::Relaxed) & 0xFFFF_FFFF;
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.unwrap_or(0)
& 0xFFFF_FFFF;
(n << 32) | nanos
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn segment_ids_are_unique_within_process() {
let mut seen = std::collections::HashSet::new();
for _ in 0..10_000 {
assert!(
seen.insert(fts_new_segment_id()),
"fts_new_segment_id produced a duplicate within a single process run"
);
}
}
}