use tracing::info;
use crate::data::executor::core_loop::CoreLoop;
use nodedb_types::DatabaseId;
impl CoreLoop {
pub(super) fn run_segment_compaction(&mut self, force: bool) -> (usize, usize) {
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as i64)
.unwrap_or(0);
let max_per_pass = self.segment_compaction_config.max_segments_per_pass;
let mut total_merged = 0usize;
let mut total_deferred = 0usize;
let bitemporal_flags: std::collections::HashMap<(crate::types::TenantId, String), bool> =
self.ts_registries
.keys()
.map(|(tid, col)| {
let flag = self.is_bitemporal(tid.as_u64(), col);
((*tid, col.clone()), flag)
})
.collect();
let tenant_db: std::collections::HashMap<crate::types::TenantId, DatabaseId> = self
.ts_registries
.keys()
.map(|(tid, _)| {
let db = self
.tenant_database_map
.get(tid)
.copied()
.unwrap_or(DatabaseId::DEFAULT);
(*tid, db)
})
.collect();
let budget = self.maintenance_budget.clone();
let core_id = self.core_id;
for ((tid, collection), registry) in &mut self.ts_registries {
let db = tenant_db.get(tid).copied().unwrap_or(DatabaseId::DEFAULT);
let _lease = if force {
None
} else {
match budget.as_ref() {
None => None,
Some(tracker) => match tracker.try_acquire(db, 0.0) {
Some(l) => Some(l),
None => {
total_deferred += 1;
tracing::debug!(
core = core_id,
db = db.as_u64(),
collection = %collection,
"segment compaction deferred: database over maintenance budget"
);
continue;
}
},
}
};
let bitemporal = bitemporal_flags
.get(&(*tid, collection.clone()))
.copied()
.unwrap_or(false);
let mut groups = registry.find_mergeable(now_ms);
if !force {
groups.truncate(max_per_pass);
}
for group in &groups {
for &start_ts in group {
registry.mark_merging(start_ts);
}
total_merged += group.len();
}
let expired = registry.find_expired(now_ms, bitemporal);
for start_ts in &expired {
registry.mark_deleted(*start_ts);
}
let purged = registry.purge_deleted();
if !groups.is_empty() || !purged.is_empty() {
info!(
core = core_id,
collection = %collection,
merge_groups = groups.len(),
expired = expired.len(),
purged = purged.len(),
"timeseries partition compaction"
);
}
if force && total_merged == 0 {
let sealed_count = registry.sealed_count();
if sealed_count >= 2 {
info!(
core = core_id,
collection = %collection,
sealed_count,
"forced compaction: marking sealed partitions for merge"
);
}
}
}
(total_merged, total_deferred)
}
}