tonbo 0.4.0-a1

Embedded database for serverless and edge runtimes, storing data as Parquet on S3
Documentation
use std::sync::Arc;

use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray, UInt64Array};
use arrow_schema::{DataType, Field, Schema};
use fusio::{executor::NoopExecutor, mem::fs::InMemoryFs};
use futures::{TryStreamExt, executor::block_on};
use typed_arrow_dyn::{DynCell, DynRow};

use crate::{
    db::{DB, DbInner, Expr, wal::apply_dyn_wal_batch},
    extractor::{self, KeyExtractError},
    inmem::{
        immutable::memtable::MVCC_TOMBSTONE_COL,
        policy::{BatchesThreshold, NeverSeal},
    },
    key::KeyOwned,
    mode::DynModeConfig,
    mvcc::Timestamp,
    test::build_batch,
};

#[tokio::test(flavor = "current_thread")]
async fn ingest_tombstone_length_mismatch() {
    let schema = Arc::new(Schema::new(vec![
        Field::new("id", DataType::Utf8, false),
        Field::new("v", DataType::Int32, false),
    ]));
    let rows = vec![DynRow(vec![
        Some(DynCell::Str("k".into())),
        Some(DynCell::I32(1)),
    ])];
    let batch: RecordBatch = build_batch(schema.clone(), rows).expect("batch");

    let config = DynModeConfig::from_key_name(schema.clone(), "id").expect("config");
    let executor = Arc::new(NoopExecutor);
    let db: DbInner<InMemoryFs, NoopExecutor> = DB::new(config, Arc::clone(&executor))
        .await
        .expect("db")
        .into_inner();

    let err = db
        .ingest_with_tombstones(batch, vec![])
        .await
        .expect_err("length mismatch");
    assert!(matches!(
        err,
        KeyExtractError::TombstoneLengthMismatch {
            expected: 1,
            actual: 0
        }
    ));
}

#[tokio::test(flavor = "current_thread")]
async fn ingest_batch_with_tombstones_marks_versions_and_visibility() {
    let schema = Arc::new(Schema::new(vec![
        Field::new("id", DataType::Utf8, false),
        Field::new("v", DataType::Int32, false),
    ]));
    let extractor = extractor::projection_for_field(schema.clone(), 0).expect("extractor");
    let executor = Arc::new(NoopExecutor);
    let config = DynModeConfig::new(schema.clone(), extractor).expect("config");
    let db: DbInner<InMemoryFs, NoopExecutor> = DB::new(config, Arc::clone(&executor))
        .await
        .expect("db")
        .into_inner();

    let rows = vec![
        DynRow(vec![Some(DynCell::Str("k1".into())), Some(DynCell::I32(1))]),
        DynRow(vec![Some(DynCell::Str("k2".into())), Some(DynCell::I32(2))]),
    ];
    let batch: RecordBatch = build_batch(schema.clone(), rows).expect("batch");
    let result = db.ingest_with_tombstones(batch, vec![false, true]).await;
    result.expect("ingest");

    let chain_k1 = db
        .mem_read()
        .inspect_versions(&KeyOwned::from("k1"))
        .expect("chain k1");
    assert_eq!(chain_k1.len(), 1);
    assert!(!chain_k1[0].1);

    let chain_k2 = db
        .mem_read()
        .inspect_versions(&KeyOwned::from("k2"))
        .expect("chain k2");
    assert_eq!(chain_k2.len(), 1);
    assert!(chain_k2[0].1);

    let pred = Expr::is_not_null("id");
    let snapshot = block_on(db.begin_snapshot()).expect("snapshot");
    let plan = block_on(snapshot.plan_scan(&db, &pred, None, None)).expect("plan");
    let stream = block_on(db.execute_scan(plan)).expect("exec");
    let visible: Vec<String> = block_on(stream.try_collect::<Vec<_>>())
        .expect("collect")
        .into_iter()
        .flat_map(|batch| {
            let col = batch
                .column(0)
                .as_any()
                .downcast_ref::<StringArray>()
                .expect("utf8 col");
            col.iter()
                .flatten()
                .map(|s| s.to_string())
                .collect::<Vec<_>>()
        })
        .collect();
    assert_eq!(visible, vec!["k1".to_string()]);
}

#[tokio::test(flavor = "current_thread")]
async fn ingest_with_tombstones_profile_reports_coherent_timings() {
    let schema = Arc::new(Schema::new(vec![
        Field::new("id", DataType::Utf8, false),
        Field::new("v", DataType::Int32, false),
    ]));
    let extractor = extractor::projection_for_field(schema.clone(), 0).expect("extractor");
    let executor = Arc::new(NoopExecutor);
    let config = DynModeConfig::new(schema.clone(), extractor).expect("config");
    let db = DB::new(config, Arc::clone(&executor)).await.expect("db");

    let batch = RecordBatch::try_new(
        schema,
        vec![
            Arc::new(StringArray::from(vec!["k1", "k2"])) as _,
            Arc::new(Int32Array::from(vec![1, 2])) as _,
        ],
    )
    .expect("batch");

    let profile = db
        .ingest_with_tombstones_with_profile(batch, vec![false, true])
        .await
        .expect("ingest");

    assert_eq!(
        profile.wal_append_ns(),
        profile
            .wal_append_submit_ns()
            .saturating_add(profile.wal_append_wait_ns()),
        "wal append total should equal submit plus durable wait"
    );
    assert_eq!(
        profile.wal_commit_ns(),
        profile
            .wal_commit_submit_ns()
            .saturating_add(profile.wal_commit_wait_ns()),
        "wal commit total should equal submit plus durable wait"
    );

    let component_sum = profile
        .partition_ns()
        .saturating_add(profile.wal_append_ns())
        .saturating_add(profile.wal_commit_ns())
        .saturating_add(profile.mutable_insert_ns())
        .saturating_add(profile.seal_ns())
        .saturating_add(profile.minor_compaction_ns());
    assert!(
        profile.total_ns() >= component_sum,
        "profile total should cover measured phase timings"
    );

    let snapshot = db.begin_snapshot().await.expect("snapshot");
    let batches = snapshot
        .scan(&db)
        .collect()
        .await
        .expect("collect visible rows");
    let visible: Vec<String> = batches
        .into_iter()
        .flat_map(|batch| {
            let ids = batch
                .column(0)
                .as_any()
                .downcast_ref::<StringArray>()
                .expect("utf8 col");
            ids.iter()
                .flatten()
                .map(|value| value.to_string())
                .collect::<Vec<_>>()
        })
        .collect();
    assert_eq!(visible, vec!["k1".to_string()]);
}

#[tokio::test(flavor = "current_thread")]
async fn apply_dyn_wal_batch_inserts_live_rows() {
    let schema = Arc::new(Schema::new(vec![
        Field::new("id", DataType::Utf8, false),
        Field::new("v", DataType::Int32, false),
    ]));
    let config = DynModeConfig::from_key_name(schema.clone(), "id").expect("config");
    let executor = Arc::new(NoopExecutor);
    let db: DbInner<InMemoryFs, NoopExecutor> = DB::new(config, Arc::clone(&executor))
        .await
        .expect("db")
        .into_inner();

    let batch = RecordBatch::try_new(
        schema,
        vec![
            Arc::new(StringArray::from(vec!["k1", "k2"])) as _,
            Arc::new(Int32Array::from(vec![1, 2])) as _,
        ],
    )
    .expect("batch");
    let commit_ts = Timestamp::new(5);
    let commit_array: ArrayRef = Arc::new(UInt64Array::from(vec![commit_ts.get(); 2]));

    apply_dyn_wal_batch(&db, batch, commit_array, commit_ts).expect("apply");

    let live = db
        .mem_read()
        .inspect_versions(&KeyOwned::from("k1"))
        .expect("live key");
    assert_eq!(live, vec![(commit_ts, false)]);

    let deleted = db
        .mem_read()
        .inspect_versions(&KeyOwned::from("k2"))
        .expect("second key");
    assert_eq!(deleted, vec![(commit_ts, false)]);
}

#[tokio::test(flavor = "current_thread")]
async fn apply_dyn_wal_batch_rejects_tombstone_payloads() {
    let schema = Arc::new(Schema::new(vec![
        Field::new("id", DataType::Utf8, false),
        Field::new("v", DataType::Int32, true),
    ]));
    let config = DynModeConfig::from_key_name(schema.clone(), "id").expect("config");
    let executor = Arc::new(NoopExecutor);
    let db: DbInner<InMemoryFs, NoopExecutor> = DB::new(config, Arc::clone(&executor))
        .await
        .expect("db")
        .into_inner();

    let wal_schema = Arc::new(Schema::new(vec![
        Field::new("id", DataType::Utf8, false),
        Field::new("v", DataType::Int32, true),
        Field::new(MVCC_TOMBSTONE_COL, DataType::Boolean, false),
    ]));
    let wal_batch = RecordBatch::try_new(
        wal_schema,
        vec![
            Arc::new(StringArray::from(vec!["k"])) as ArrayRef,
            Arc::new(Int32Array::from(vec![Some(1)])) as ArrayRef,
            Arc::new(BooleanArray::from(vec![Some(false)])) as ArrayRef,
        ],
    )
    .expect("wal batch");
    let commit_ts = Timestamp::new(22);
    let commit_array: ArrayRef = Arc::new(UInt64Array::from(vec![commit_ts.get()]));

    let err = apply_dyn_wal_batch(&db, wal_batch, commit_array, commit_ts)
        .expect_err("apply should fail");
    match err {
        KeyExtractError::SchemaMismatch { .. } | KeyExtractError::Wal(_) => {}
        other => panic!("unexpected error variant: {other:?}"),
    }
}

#[tokio::test(flavor = "current_thread")]
async fn apply_dyn_wal_batch_allows_user_tombstone_column() {
    let schema = Arc::new(Schema::new(vec![
        Field::new("id", DataType::Utf8, false),
        Field::new("_tombstone", DataType::Utf8, false),
        Field::new("v", DataType::Int32, true),
    ]));
    let config = DynModeConfig::from_key_name(schema.clone(), "id").expect("config");
    let executor = Arc::new(NoopExecutor);
    let db: DbInner<InMemoryFs, NoopExecutor> = DB::new(config, Arc::clone(&executor))
        .await
        .expect("db")
        .into_inner();

    let batch = RecordBatch::try_new(
        schema,
        vec![
            Arc::new(StringArray::from(vec!["k"])) as ArrayRef,
            Arc::new(StringArray::from(vec![Some("flag")])) as ArrayRef,
            Arc::new(Int32Array::from(vec![Some(9)])) as ArrayRef,
        ],
    )
    .expect("batch");
    let commit_ts = Timestamp::new(30);
    let commit_array: ArrayRef = Arc::new(UInt64Array::from(vec![commit_ts.get()]));

    apply_dyn_wal_batch(&db, batch, commit_array, commit_ts).expect("apply");

    let versions = db
        .mem_read()
        .inspect_versions(&KeyOwned::from("k"))
        .expect("versions");
    assert_eq!(versions, vec![(commit_ts, false)]);
}

#[tokio::test(flavor = "current_thread")]
async fn begin_snapshot_tracks_commit_clock() {
    let schema = Arc::new(Schema::new(vec![
        Field::new("id", DataType::Utf8, false),
        Field::new("v", DataType::Int32, false),
    ]));
    let extractor = extractor::projection_for_field(schema.clone(), 0).expect("extractor");
    let executor = Arc::new(NoopExecutor);
    let config = DynModeConfig::new(schema.clone(), extractor).expect("config");
    let db: DbInner<InMemoryFs, NoopExecutor> = DB::new(config, Arc::clone(&executor))
        .await
        .expect("db")
        .into_inner();

    let snapshot = db.begin_snapshot().await.expect("snapshot");
    assert_eq!(snapshot.read_view().read_ts(), Timestamp::MIN);
    assert!(snapshot.head().last_manifest_txn.is_none());
    assert!(snapshot.latest_version().is_none());

    let rows = vec![DynRow(vec![
        Some(DynCell::Str("k1".into())),
        Some(DynCell::I32(1)),
    ])];
    let batch: RecordBatch = build_batch(schema.clone(), rows).expect("batch");
    db.ingest_with_tombstones(batch, vec![false])
        .await
        .expect("ingest");

    let snapshot_after = db.begin_snapshot().await.expect("snapshot after ingest");
    assert_eq!(snapshot_after.read_view().read_ts(), Timestamp::new(0));
    assert!(snapshot_after.head().last_manifest_txn.is_none());
    assert!(snapshot_after.latest_version().is_none());
}

#[tokio::test(flavor = "current_thread")]
async fn dynamic_seal_on_batches_threshold() {
    // Build a simple schema: id: Utf8 (key), v: Int32
    let schema = std::sync::Arc::new(Schema::new(vec![
        Field::new("id", DataType::Utf8, false),
        Field::new("v", DataType::Int32, false),
    ]));
    // Build one batch with two rows
    let rows = vec![
        DynRow(vec![Some(DynCell::Str("a".into())), Some(DynCell::I32(1))]),
        DynRow(vec![Some(DynCell::Str("b".into())), Some(DynCell::I32(2))]),
    ];
    let batch: RecordBatch = build_batch(schema.clone(), rows).expect("valid dyn rows");

    let config = DynModeConfig::from_key_name(schema.clone(), "id").expect("key name");
    let mut db: DbInner<InMemoryFs, NoopExecutor> = DB::new(config, Arc::new(NoopExecutor))
        .await
        .expect("schema ok")
        .into_inner();
    db.set_seal_policy(Arc::new(BatchesThreshold { batches: 1 }));
    assert_eq!(db.num_immutable_segments(), 0);
    db.ingest(batch).await.expect("insert batch");
    assert_eq!(db.num_immutable_segments(), 1);
}

#[tokio::test(flavor = "current_thread")]
async fn auto_seals_when_memtable_hits_capacity() {
    let schema = Arc::new(Schema::new(vec![
        Field::new("id", DataType::Utf8, false),
        Field::new("v", DataType::Int32, false),
    ]));
    let config = DynModeConfig::from_key_name(schema.clone(), "id").expect("key name");
    let mut db: DbInner<InMemoryFs, NoopExecutor> = DB::new(config, Arc::new(NoopExecutor))
        .await
        .expect("schema ok")
        .into_inner();

    // Force minimal capacity and disable policy-based sealing to exercise MemtableFull
    // recovery.
    db.set_mem_capacity(1);
    db.set_seal_policy(Arc::new(NeverSeal));

    let make_batch = |val: i32| {
        build_batch(
            schema.clone(),
            vec![DynRow(vec![
                Some(DynCell::Str("k".into())),
                Some(DynCell::I32(val)),
            ])],
        )
        .expect("batch")
    };

    db.ingest(make_batch(1)).await.expect("ingest 1");
    db.ingest(make_batch(2)).await.expect("ingest 2");
    db.ingest(make_batch(3)).await.expect("ingest 3");

    // Each time capacity is hit we should seal and continue inserting.
    assert_eq!(db.num_immutable_segments(), 2);
    assert_eq!(db.mem_read().batch_count(), 1);
}