tonbo 0.3.2

An embedded persistent KV database in Rust.
Documentation
#[cfg(all(test, feature = "opfs", target_arch = "wasm32"))]
mod tests {
    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
    use std::{collections::BTreeMap, ops::Bound, sync::Arc};

    use fusio::{path::Path, DynFs};
    use futures::StreamExt;
    use tonbo::{
        executor::opfs::OpfsExecutor,
        record::{DataType, DynRecord, DynSchema, Record, Value, ValueDesc},
        DbOption, Projection, DB,
    };
    use wasm_bindgen_test::wasm_bindgen_test;

    fn test_dyn_item_schema() -> DynSchema {
        let descs = vec![
            ValueDesc::new("id".to_string(), DataType::Int64, false),
            ValueDesc::new("age".to_string(), DataType::Int8, true),
            ValueDesc::new("name".to_string(), DataType::String, false),
            ValueDesc::new("email".to_string(), DataType::String, true),
            ValueDesc::new("bytes".to_string(), DataType::Bytes, true),
        ];
        DynSchema::new(descs, 0)
    }

    fn test_dyn_items() -> Vec<DynRecord> {
        let mut items = vec![];
        for i in 0..50 {
            let columns = vec![
                Value::new(DataType::Int64, "id".to_string(), Arc::new(i as i64), false),
                Value::new(
                    DataType::Int8,
                    "age".to_string(),
                    Arc::new(Some(i as i8)),
                    true,
                ),
                Value::new(
                    DataType::String,
                    "name".to_string(),
                    Arc::new(i.to_string()),
                    false,
                ),
                Value::new(
                    DataType::String,
                    "email".to_string(),
                    Arc::new(Some(format!("{}@tonbo.io", i))),
                    true,
                ),
                Value::new(
                    DataType::Bytes,
                    "bytes".to_string(),
                    Arc::new(Some((i as i32).to_le_bytes().to_vec())),
                    true,
                ),
            ];

            items.push(DynRecord::new(columns, 0));
        }
        items
    }

    async fn remove(path: &str) {
        let path = Path::from_opfs_path(path).unwrap();
        let fs = fusio::disk::LocalFs {};

        fs.remove(&path).await.unwrap();
    }

    #[wasm_bindgen_test]
    async fn test_wasm_read_write() {
        let schema = test_dyn_item_schema();
        let path = Path::from_opfs_path("opfs_dir_rw").unwrap();
        let fs = fusio::disk::LocalFs {};
        fs.create_dir_all(&path).await.unwrap();

        let option = DbOption::new(Path::from_opfs_path("opfs_dir_rw").unwrap(), &schema);

        let db: DB<DynRecord, OpfsExecutor> =
            DB::new(option, OpfsExecutor::new(), schema).await.unwrap();

        for item in test_dyn_items().into_iter() {
            db.insert(item).await.unwrap();
        }

        // test get
        {
            let tx = db.transaction().await;

            for i in 0..50 {
                let key = Value::new(DataType::Int64, "id".to_string(), Arc::new(i as i64), false);
                let option1 = tx.get(&key, Projection::All).await.unwrap();
                let entry = option1.unwrap();
                let record_ref = entry.get();

                assert_eq!(
                    *record_ref
                        .columns
                        .first()
                        .unwrap()
                        .value
                        .as_ref()
                        .downcast_ref::<i64>()
                        .unwrap(),
                    i as i64
                );
                assert_eq!(
                    *record_ref
                        .columns
                        .get(2)
                        .unwrap()
                        .value
                        .as_ref()
                        .downcast_ref::<Option<String>>()
                        .unwrap(),
                    Some(i.to_string()),
                );
                assert_eq!(
                    *record_ref
                        .columns
                        .get(3)
                        .unwrap()
                        .value
                        .as_ref()
                        .downcast_ref::<Option<String>>()
                        .unwrap(),
                    Some(format!("{}@tonbo.io", i)),
                );
                assert_eq!(
                    *record_ref
                        .columns
                        .get(4)
                        .unwrap()
                        .value
                        .as_ref()
                        .downcast_ref::<Option<Vec<u8>>>()
                        .unwrap(),
                    Some((i as i32).to_le_bytes().to_vec()),
                );
            }
            tx.commit().await.unwrap();
        }
        db.flush_wal().await.unwrap();
        drop(db);

        remove("opfs_dir_rw").await;
    }

    #[wasm_bindgen_test]
    async fn test_wasm_transaction() {
        let schema = test_dyn_item_schema();

        let fs = fusio::disk::LocalFs {};
        let path = Path::from_opfs_path("opfs_dir_txn").unwrap();
        fs.create_dir_all(&path).await.unwrap();

        let option = DbOption::new(Path::from_opfs_path("opfs_dir_txn").unwrap(), &schema);

        let db: DB<DynRecord, OpfsExecutor> =
            DB::new(option, OpfsExecutor::new(), schema).await.unwrap();

        {
            let mut txn = db.transaction().await;
            for item in test_dyn_items().into_iter() {
                txn.insert(item);
            }
            txn.commit().await.unwrap();
        }

        // test scan
        {
            let txn = db.transaction().await;
            let lower = Value::new(DataType::Int64, "id".to_owned(), Arc::new(5_i64), false);
            let upper = Value::new(DataType::Int64, "id".to_owned(), Arc::new(47_i64), false);
            let mut scan = txn
                .scan((Bound::Included(&lower), Bound::Included(&upper)))
                .projection(&["id", "name", "bytes"])
                .take()
                .await
                .unwrap();

            let mut i = 5_i64;
            while let Some(entry) = scan.next().await.transpose().unwrap() {
                let columns = entry.value().unwrap().columns;

                let primary_key_col = columns.first().unwrap();
                assert_eq!(primary_key_col.datatype(), DataType::Int64);
                assert_eq!(primary_key_col.desc.name, "id".to_string());
                assert_eq!(
                    *primary_key_col
                        .value
                        .as_ref()
                        .downcast_ref::<i64>()
                        .unwrap(),
                    i
                );

                let col = columns.get(1).unwrap();
                assert_eq!(col.datatype(), DataType::Int8);
                assert_eq!(col.desc.name, "age".to_string());
                let age = col.value.as_ref().downcast_ref::<Option<i8>>();
                assert!(age.is_some());
                assert_eq!(age.unwrap(), &None);

                let col = columns.get(2).unwrap();
                assert_eq!(col.datatype(), DataType::String);
                assert_eq!(col.desc.name, "name".to_string());
                let name = col.value.as_ref().downcast_ref::<Option<String>>();
                assert!(name.is_some());
                assert_eq!(name.unwrap(), &Some(i.to_string()));

                let col = columns.get(4).unwrap();
                assert_eq!(col.datatype(), DataType::Bytes);
                assert_eq!(col.desc.name, "bytes".to_string());
                let bytes = col.value.as_ref().downcast_ref::<Option<Vec<u8>>>();
                assert!(bytes.is_some());
                assert_eq!(bytes.unwrap(), &Some((i as i32).to_le_bytes().to_vec()));
                i += 1
            }
            assert_eq!(i, 48);
        }
        db.flush_wal().await.unwrap();
        drop(db);

        remove(&"opfs_dir_txn").await;
    }

    #[wasm_bindgen_test]
    async fn test_wasm_schema_recover() {
        let schema = test_dyn_item_schema();
        let path = Path::from_opfs_path("opfs_dir").unwrap();
        let fs = fusio::disk::LocalFs {};
        fs.create_dir_all(&path).await.unwrap();

        let option = DbOption::new(Path::from_opfs_path("opfs_dir").unwrap(), &schema);

        {
            let db: DB<DynRecord, OpfsExecutor> =
                DB::new(option, OpfsExecutor::new(), schema).await.unwrap();

            for item in test_dyn_items().into_iter() {
                db.insert(item).await.unwrap();
            }

            db.flush_wal().await.unwrap();
        }

        let schema = test_dyn_item_schema();
        let option = DbOption::new(Path::from_opfs_path("opfs_dir").unwrap(), &schema);
        let db: DB<DynRecord, OpfsExecutor> =
            DB::new(option, OpfsExecutor::new(), schema).await.unwrap();

        let mut sort_items = BTreeMap::new();
        for item in test_dyn_items() {
            sort_items.insert(item.key(), item);
        }

        {
            let tx = db.transaction().await;
            let mut scan = tx
                .scan((Bound::Unbounded, Bound::Unbounded))
                .projection(&["id", "age", "name"])
                .take()
                .await
                .unwrap();

            while let Some(entry) = scan.next().await.transpose().unwrap() {
                let columns1 = entry.value().unwrap().columns;
                let (_, record) = sort_items.pop_first().unwrap();
                let columns2 = record.as_record_ref().columns;

                assert_eq!(columns1.len(), columns2.len());
                for i in 0..columns1.len() {
                    assert_eq!(columns1.get(i), columns2.get(i));
                }
            }
        }
        db.flush_wal().await.unwrap();
        drop(db);
        remove("opfs_dir").await;
    }

    #[ignore]
    #[wasm_bindgen_test]
    async fn test_s3_read_write() {
        use fusio::remotes::aws::AwsCredential;
        use fusio_dispatch::FsOptions;

        if option_env!("AWS_ACCESS_KEY_ID").is_none() {
            return;
        }
        let key_id = option_env!("AWS_ACCESS_KEY_ID").unwrap().to_string();
        let secret_key = option_env!("AWS_SECRET_ACCESS_KEY").unwrap().to_string();

        let schema = test_dyn_item_schema();

        let fs_option = FsOptions::S3 {
            bucket: "wasm-data".to_string(),
            credential: Some(AwsCredential {
                key_id,
                secret_key,
                token: None,
            }),
            endpoint: None,
            sign_payload: None,
            checksum: None,
            region: Some("ap-southeast-2".to_string()),
        };

        let option = DbOption::new(Path::from_opfs_path("s3_rw").unwrap(), &schema)
            .level_path(
                0,
                Path::from_url_path("tonbo/l0").unwrap(),
                fs_option.clone(),
            )
            .unwrap()
            .level_path(
                1,
                Path::from_url_path("tonbo/l1").unwrap(),
                fs_option.clone(),
            )
            .unwrap()
            .level_path(2, Path::from_url_path("tonbo/l2").unwrap(), fs_option)
            .unwrap()
            .major_threshold_with_sst_size(3)
            .level_sst_magnification(1)
            .max_sst_file_size(1 * 1024);

        let db: DB<DynRecord, OpfsExecutor> =
            DB::new(option, OpfsExecutor::new(), schema).await.unwrap();

        for (i, item) in test_dyn_items().into_iter().enumerate() {
            db.insert(item).await.unwrap();
            if i % 5 == 0 {
                db.flush().await.unwrap();
            }
        }

        {
            let tx = db.transaction().await;

            let mut scan = tx
                .scan((Bound::Unbounded, Bound::Unbounded))
                .projection(&["id", "age", "name"])
                .take()
                .await
                .unwrap();

            let mut i = 0;
            while let Some(entry) = scan.next().await.transpose().unwrap() {
                let columns = entry.value().unwrap().columns;

                let primary_key_col = columns.first().unwrap();
                assert_eq!(primary_key_col.datatype(), DataType::Int64);
                assert_eq!(primary_key_col.desc.name, "id".to_string());
                assert_eq!(
                    *primary_key_col
                        .value
                        .as_ref()
                        .downcast_ref::<i64>()
                        .unwrap(),
                    i
                );

                let col = columns.get(1).unwrap();
                assert_eq!(col.datatype(), DataType::Int8);
                assert_eq!(col.desc.name, "age".to_string());
                let age = col.value.as_ref().downcast_ref::<Option<i8>>();
                assert!(age.is_some());
                assert_eq!(age.unwrap(), &Some(i as i8));

                let col = columns.get(2).unwrap();
                assert_eq!(col.datatype(), DataType::String);
                assert_eq!(col.desc.name, "name".to_string());
                let name = col.value.as_ref().downcast_ref::<Option<String>>();
                assert!(name.is_some());
                assert_eq!(name.unwrap(), &Some(i.to_string()));

                let col = columns.get(4).unwrap();
                assert_eq!(col.datatype(), DataType::Bytes);
                assert_eq!(col.desc.name, "bytes".to_string());
                let bytes = col.value.as_ref().downcast_ref::<Option<Vec<u8>>>();
                assert!(bytes.unwrap().is_none());
                i += 1
            }
        }
        drop(db);

        remove("s3_rw").await;
    }
}