tonbo 0.4.0-a1

Embedded database for serverless and edge runtimes, storing data as Parquet on S3
Documentation
use std::sync::Arc;

use arrow_array::RecordBatch;
use arrow_schema::{DataType, Field, Schema};
use fusio::{DynFs, disk::LocalFs, executor::NoopExecutor, mem::fs::InMemoryFs, path::Path};
use typed_arrow_dyn::{DynCell, DynRow};

use crate::{
    db::{DB, DbInner},
    inmem::policy::BatchesThreshold,
    mode::DynModeConfig,
    mvcc::Timestamp,
    ondisk::sstable::{SsTableConfig, SsTableDescriptor, SsTableError, SsTableId},
    test::build_batch,
};

#[tokio::test(flavor = "current_thread")]
async fn flush_without_immutables_errors() {
    let schema = std::sync::Arc::new(Schema::new(vec![
        Field::new("id", DataType::Utf8, false),
        Field::new("v", DataType::Int32, false),
    ]));
    let config = DynModeConfig::from_key_name(schema.clone(), "id").expect("key name config");
    let executor = Arc::new(NoopExecutor);
    let db: DbInner<InMemoryFs, NoopExecutor> = DB::new(config, executor)
        .await
        .expect("db init")
        .into_inner();

    let fs: Arc<dyn DynFs> = Arc::new(LocalFs {});
    let sstable_cfg = Arc::new(SsTableConfig::new(
        schema.clone(),
        fs,
        Path::from("/tmp/tonbo-flush-test"),
    ));
    let descriptor = SsTableDescriptor::new(SsTableId::new(1), 0);

    let result = db
        .flush_immutables_with_descriptor(sstable_cfg, descriptor.clone())
        .await;
    assert!(matches!(result, Err(SsTableError::NoImmutableSegments)));
    assert_eq!(db.num_immutable_segments(), 0);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn flush_publishes_manifest_version() {
    let schema = Arc::new(Schema::new(vec![
        Field::new("id", DataType::Utf8, false),
        Field::new("v", DataType::Int32, false),
    ]));
    let extractor = crate::extractor::projection_for_field(schema.clone(), 0).expect("extractor");
    let executor = Arc::new(NoopExecutor);
    let config = DynModeConfig::new(schema.clone(), extractor).expect("config");
    let mut db: DbInner<InMemoryFs, NoopExecutor> = DB::new(config, Arc::clone(&executor))
        .await
        .expect("db")
        .into_inner();
    db.set_seal_policy(Arc::new(BatchesThreshold { batches: 1 }));

    let rows = vec![
        DynRow(vec![Some(DynCell::Str("a".into())), Some(DynCell::I32(1))]),
        DynRow(vec![Some(DynCell::Str("b".into())), Some(DynCell::I32(2))]),
    ];
    let batch: RecordBatch = build_batch(schema.clone(), rows).expect("valid dyn rows");
    db.ingest(batch).await.expect("ingest triggers seal");
    assert_eq!(db.num_immutable_segments(), 1);

    let fs: Arc<dyn DynFs> = Arc::new(LocalFs {});
    let sstable_cfg = Arc::new(SsTableConfig::new(
        schema.clone(),
        fs,
        Path::from("/tmp/tonbo-flush-ok"),
    ));
    let descriptor = SsTableDescriptor::new(SsTableId::new(7), 0);

    let table = db
        .flush_immutables_with_descriptor(sstable_cfg, descriptor.clone())
        .await
        .expect("flush succeeds");
    assert_eq!(db.num_immutable_segments(), 0);

    let snapshot = db
        .manifest
        .snapshot_latest(db.manifest_table)
        .await
        .expect("manifest snapshot");
    assert_eq!(
        snapshot.head.last_manifest_txn,
        Some(Timestamp::new(1)),
        "first flush should publish manifest txn 1"
    );
    let latest = snapshot
        .latest_version
        .expect("latest version must exist after flush");
    assert_eq!(
        latest.commit_timestamp(),
        Timestamp::new(1),
        "latest version should reflect manifest txn 1"
    );
    assert_eq!(latest.ssts().len(), 1);
    assert_eq!(latest.ssts()[0].len(), 1);
    let recorded = &latest.ssts()[0][0];
    assert_eq!(recorded.sst_id(), descriptor.id());
    assert!(
        recorded.stats().is_some() || table.descriptor().stats().is_none(),
        "stats should propagate when available"
    );
    assert!(
        recorded.wal_segments().is_none(),
        "no WAL segments recorded since none were attached"
    );
}