use std::sync::{
Arc,
atomic::{AtomicU64, Ordering},
};
use fusio::executor::{Executor, Timer};
use tracing::instrument;
use crate::{
db::DbInner,
manifest::ManifestFs,
observability::{log_info, log_warn},
ondisk::sstable::{SsTable, SsTableConfig, SsTableDescriptor, SsTableError, SsTableId},
};
pub(crate) struct MinorCompactor {
segment_threshold: usize,
target_level: usize,
next_id: Arc<AtomicU64>,
}
impl MinorCompactor {
#[cfg(test)]
pub(crate) fn new(segment_threshold: usize, target_level: usize, start_id: u64) -> Self {
Self::with_id_allocator(
segment_threshold,
target_level,
Arc::new(AtomicU64::new(start_id)),
)
}
pub(crate) fn with_id_allocator(
segment_threshold: usize,
target_level: usize,
next_id: Arc<AtomicU64>,
) -> Self {
Self {
segment_threshold: segment_threshold.max(1),
target_level,
next_id,
}
}
fn next_descriptor(&self) -> SsTableDescriptor {
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
SsTableDescriptor::new(SsTableId::new(id), self.target_level)
}
#[instrument(
name = "compaction::minor",
skip(self, db, config),
fields(
component = "compaction",
target_level = self.target_level,
segment_threshold = self.segment_threshold
)
)]
pub(crate) async fn maybe_compact<FS, E>(
&self,
db: &DbInner<FS, E>,
config: Arc<SsTableConfig>,
) -> Result<Option<SsTable>, SsTableError>
where
FS: ManifestFs<E>,
E: Executor + Timer + Clone,
<FS as fusio::fs::Fs>::File: fusio::durability::FileCommit,
{
let immutable_segments = db.num_immutable_segments();
if immutable_segments < self.segment_threshold {
return Ok(None);
}
log_info!(
component = "compaction",
event = "minor_compaction_start",
immutable_segments,
target_level = self.target_level,
);
let descriptor = self.next_descriptor();
match db
.flush_immutables_with_descriptor(config, descriptor)
.await
{
Ok(table) => {
if let Some(stats) = table.descriptor().stats() {
log_info!(
component = "compaction",
event = "minor_compaction_complete",
target_level = self.target_level,
output_rows = stats.rows,
output_bytes = stats.bytes,
output_tombstones = stats.tombstones,
);
} else {
log_info!(
component = "compaction",
event = "minor_compaction_complete",
target_level = self.target_level,
);
}
Ok(Some(table))
}
Err(err) => {
log_warn!(
component = "compaction",
event = "minor_compaction_failed",
target_level = self.target_level,
error = ?err,
);
Err(err)
}
}
}
}
#[cfg(all(test, feature = "tokio"))]
mod tests {
use std::sync::Arc;
use arrow_schema::{DataType, Field, Schema};
use fusio::{
disk::LocalFs, dynamic::DynFs, executor::NoopExecutor, mem::fs::InMemoryFs, path::Path,
};
use typed_arrow_dyn::{DynCell, DynRow};
use super::MinorCompactor;
use crate::{
db::{DB, DbInner},
ondisk::sstable::SsTableConfig,
test::build_batch,
};
async fn build_db() -> (Arc<SsTableConfig>, DbInner<InMemoryFs, NoopExecutor>) {
let schema = std::sync::Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
]));
let config = crate::schema::SchemaBuilder::from_schema(schema)
.primary_key("id")
.with_metadata()
.build()
.expect("key field");
let schema = Arc::clone(&config.schema);
let executor = Arc::new(NoopExecutor);
let db = DB::<InMemoryFs, NoopExecutor>::builder(config)
.in_memory("compaction-test")
.expect("in_memory config")
.open_with_executor(Arc::clone(&executor))
.await
.expect("db init")
.into_inner();
let fs: Arc<dyn DynFs> = Arc::new(LocalFs {});
let cfg = Arc::new(SsTableConfig::new(
schema.clone(),
fs,
Path::from("/tmp/tonbo-compaction-test"),
));
(cfg, db)
}
#[tokio::test(flavor = "multi_thread")]
async fn below_threshold_noop() {
let (cfg, db) = build_db().await;
let compactor = MinorCompactor::new(2, 0, 7);
let result = compactor.maybe_compact(&db, cfg).await;
assert!(matches!(result, Ok(None)));
assert_eq!(db.num_immutable_segments(), 0);
}
#[tokio::test(flavor = "multi_thread")]
async fn threshold_met_invokes_flush() {
let (cfg, mut db) = build_db().await;
db.set_seal_policy(Arc::new(crate::inmem::policy::BatchesThreshold {
batches: 1,
}));
let rows = vec![DynRow(vec![
Some(DynCell::Str("k".into())),
Some(DynCell::I32(1)),
])];
let batch = build_batch(cfg.schema().clone(), rows).expect("batch");
db.ingest(batch).await.expect("ingest");
assert_eq!(db.num_immutable_segments(), 1);
let compactor = MinorCompactor::new(1, 0, 9);
let table = compactor
.maybe_compact(&db, cfg)
.await
.expect("flush result")
.expect("sstable");
assert_eq!(db.num_immutable_segments(), 0);
let descriptor = table.descriptor();
assert_eq!(descriptor.id().raw(), 9);
assert_eq!(descriptor.level(), 0);
assert_eq!(descriptor.stats().map(|s| s.rows), Some(1));
let stats = descriptor.stats().expect("descriptor stats");
assert_eq!(stats.rows, 1);
assert!(stats.bytes > 0);
}
}