tonbo 0.4.0-a1

Embedded database for serverless and edge runtimes, storing data as Parquet on S3
Documentation
//! Naïve minor-compaction driver for flushing immutable memtables.

use std::sync::{
    Arc,
    atomic::{AtomicU64, Ordering},
};

use fusio::executor::{Executor, Timer};
use tracing::instrument;

use crate::{
    db::DbInner,
    manifest::ManifestFs,
    observability::{log_info, log_warn},
    ondisk::sstable::{SsTable, SsTableConfig, SsTableDescriptor, SsTableError, SsTableId},
};

/// Naïve minor-compaction driver that flushes once a segment threshold is hit.
pub(crate) struct MinorCompactor {
    segment_threshold: usize,
    target_level: usize,

    next_id: Arc<AtomicU64>,
}

impl MinorCompactor {
    /// Build a compactor that flushes after `segment_threshold` immutable runs.
    #[cfg(test)]
    pub(crate) fn new(segment_threshold: usize, target_level: usize, start_id: u64) -> Self {
        Self::with_id_allocator(
            segment_threshold,
            target_level,
            Arc::new(AtomicU64::new(start_id)),
        )
    }

    /// Build a compactor that draws SST ids from a shared allocator.
    pub(crate) fn with_id_allocator(
        segment_threshold: usize,
        target_level: usize,
        next_id: Arc<AtomicU64>,
    ) -> Self {
        Self {
            segment_threshold: segment_threshold.max(1),
            target_level,
            next_id,
        }
    }

    fn next_descriptor(&self) -> SsTableDescriptor {
        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
        SsTableDescriptor::new(SsTableId::new(id), self.target_level)
    }

    /// Flush immutables when the threshold is met, returning the new SST on success.
    #[instrument(
        name = "compaction::minor",
        skip(self, db, config),
        fields(
            component = "compaction",
            target_level = self.target_level,
            segment_threshold = self.segment_threshold
        )
    )]
    pub(crate) async fn maybe_compact<FS, E>(
        &self,
        db: &DbInner<FS, E>,
        config: Arc<SsTableConfig>,
    ) -> Result<Option<SsTable>, SsTableError>
    where
        FS: ManifestFs<E>,
        E: Executor + Timer + Clone,
        <FS as fusio::fs::Fs>::File: fusio::durability::FileCommit,
    {
        let immutable_segments = db.num_immutable_segments();
        if immutable_segments < self.segment_threshold {
            return Ok(None);
        }
        log_info!(
            component = "compaction",
            event = "minor_compaction_start",
            immutable_segments,
            target_level = self.target_level,
        );
        let descriptor = self.next_descriptor();
        match db
            .flush_immutables_with_descriptor(config, descriptor)
            .await
        {
            Ok(table) => {
                if let Some(stats) = table.descriptor().stats() {
                    log_info!(
                        component = "compaction",
                        event = "minor_compaction_complete",
                        target_level = self.target_level,
                        output_rows = stats.rows,
                        output_bytes = stats.bytes,
                        output_tombstones = stats.tombstones,
                    );
                } else {
                    log_info!(
                        component = "compaction",
                        event = "minor_compaction_complete",
                        target_level = self.target_level,
                    );
                }
                Ok(Some(table))
            }
            Err(err) => {
                log_warn!(
                    component = "compaction",
                    event = "minor_compaction_failed",
                    target_level = self.target_level,
                    error = ?err,
                );
                Err(err)
            }
        }
    }
}

#[cfg(all(test, feature = "tokio"))]
mod tests {
    use std::sync::Arc;

    use arrow_schema::{DataType, Field, Schema};
    use fusio::{
        disk::LocalFs, dynamic::DynFs, executor::NoopExecutor, mem::fs::InMemoryFs, path::Path,
    };
    use typed_arrow_dyn::{DynCell, DynRow};

    use super::MinorCompactor;
    use crate::{
        db::{DB, DbInner},
        ondisk::sstable::SsTableConfig,
        test::build_batch,
    };

    async fn build_db() -> (Arc<SsTableConfig>, DbInner<InMemoryFs, NoopExecutor>) {
        let schema = std::sync::Arc::new(Schema::new(vec![
            Field::new("id", DataType::Utf8, false),
            Field::new("v", DataType::Int32, false),
        ]));
        let config = crate::schema::SchemaBuilder::from_schema(schema)
            .primary_key("id")
            .with_metadata()
            .build()
            .expect("key field");
        let schema = Arc::clone(&config.schema);
        let executor = Arc::new(NoopExecutor);
        let db = DB::<InMemoryFs, NoopExecutor>::builder(config)
            .in_memory("compaction-test")
            .expect("in_memory config")
            .open_with_executor(Arc::clone(&executor))
            .await
            .expect("db init")
            .into_inner();

        let fs: Arc<dyn DynFs> = Arc::new(LocalFs {});
        let cfg = Arc::new(SsTableConfig::new(
            schema.clone(),
            fs,
            Path::from("/tmp/tonbo-compaction-test"),
        ));
        (cfg, db)
    }

    #[tokio::test(flavor = "multi_thread")]
    async fn below_threshold_noop() {
        let (cfg, db) = build_db().await;
        let compactor = MinorCompactor::new(2, 0, 7);
        let result = compactor.maybe_compact(&db, cfg).await;
        assert!(matches!(result, Ok(None)));
        assert_eq!(db.num_immutable_segments(), 0);
    }

    #[tokio::test(flavor = "multi_thread")]
    async fn threshold_met_invokes_flush() {
        let (cfg, mut db) = build_db().await;
        db.set_seal_policy(Arc::new(crate::inmem::policy::BatchesThreshold {
            batches: 1,
        }));
        let rows = vec![DynRow(vec![
            Some(DynCell::Str("k".into())),
            Some(DynCell::I32(1)),
        ])];
        let batch = build_batch(cfg.schema().clone(), rows).expect("batch");
        db.ingest(batch).await.expect("ingest");
        assert_eq!(db.num_immutable_segments(), 1);

        let compactor = MinorCompactor::new(1, 0, 9);
        let table = compactor
            .maybe_compact(&db, cfg)
            .await
            .expect("flush result")
            .expect("sstable");
        assert_eq!(db.num_immutable_segments(), 0);
        let descriptor = table.descriptor();
        assert_eq!(descriptor.id().raw(), 9);
        assert_eq!(descriptor.level(), 0);
        assert_eq!(descriptor.stats().map(|s| s.rows), Some(1));
        let stats = descriptor.stats().expect("descriptor stats");
        assert_eq!(stats.rows, 1);
        assert!(stats.bytes > 0);
    }
}