aedb 0.1.10

Embedded Rust storage engine with transactional commits, WAL durability, and snapshot-consistent reads
Documentation
use aedb::AedbInstance;
use aedb::catalog::DdlOperation;
use aedb::catalog::schema::ColumnDef;
use aedb::catalog::types::{ColumnType, Row, Value};
use aedb::commit::validation::Mutation;
use aedb::config::AedbConfig;
use aedb::query::plan::{ConsistencyMode, Expr, Query};
use tempfile::tempdir;

async fn setup_users(db: &AedbInstance) {
    db.create_project("p").await.expect("create project");
    db.create_scope("p", "app").await.expect("create scope");
    db.commit(Mutation::Ddl(DdlOperation::CreateTable {
        owner_id: None,
        project_id: "p".into(),
        scope_id: "app".into(),
        table_name: "users".into(),
        if_not_exists: false,
        columns: vec![
            ColumnDef {
                name: "id".into(),
                col_type: ColumnType::Integer,
                nullable: false,
            },
            ColumnDef {
                name: "status".into(),
                col_type: ColumnType::Text,
                nullable: false,
            },
        ],
        primary_key: vec!["id".into()],
    }))
    .await
    .expect("create table");
}

#[tokio::test]
async fn delete_where_deletes_matching_rows_with_limit() {
    let dir = tempdir().expect("tempdir");
    let db = AedbInstance::open(AedbConfig::default(), dir.path()).expect("open");
    setup_users(&db).await;

    for (id, status) in [
        (1, "revoked"),
        (2, "revoked"),
        (3, "revoked"),
        (4, "active"),
    ] {
        db.commit(Mutation::Upsert {
            project_id: "p".into(),
            scope_id: "app".into(),
            table_name: "users".into(),
            primary_key: vec![Value::Integer(id)],
            row: Row::from_values(vec![Value::Integer(id), Value::Text(status.into())]),
        })
        .await
        .expect("seed");
    }

    let result = db
        .delete_where(
            "p",
            "app",
            "users",
            Expr::Eq("status".into(), Value::Text("revoked".into())),
            Some(2),
        )
        .await
        .expect("delete_where");
    assert!(result.is_some());

    let remaining = db
        .query(
            "p",
            "app",
            Query::select(&["id"])
                .from("users")
                .where_(Expr::Eq("status".into(), Value::Text("revoked".into())))
                .order_by("id", aedb::query::plan::Order::Asc)
                .limit(10),
        )
        .await
        .expect("query");
    assert_eq!(remaining.rows.len(), 1);
    assert_eq!(remaining.rows[0].values[0], Value::Integer(3));
}

#[tokio::test]
async fn update_where_updates_matching_rows() {
    let dir = tempdir().expect("tempdir");
    let db = AedbInstance::open(AedbConfig::default(), dir.path()).expect("open");
    setup_users(&db).await;

    for (id, status) in [(1, "expired"), (2, "expired"), (3, "active")] {
        db.commit(Mutation::Upsert {
            project_id: "p".into(),
            scope_id: "app".into(),
            table_name: "users".into(),
            primary_key: vec![Value::Integer(id)],
            row: Row::from_values(vec![Value::Integer(id), Value::Text(status.into())]),
        })
        .await
        .expect("seed");
    }

    let result = db
        .update_where(
            "p",
            "app",
            "users",
            Expr::Eq("status".into(), Value::Text("expired".into())),
            vec![("status".into(), Value::Text("revoked".into()))],
            None,
        )
        .await
        .expect("update_where");
    assert!(result.is_some());

    let revoked = db
        .query(
            "p",
            "app",
            Query::select(&["id"])
                .from("users")
                .where_(Expr::Eq("status".into(), Value::Text("revoked".into())))
                .order_by("id", aedb::query::plan::Order::Asc)
                .limit(10),
        )
        .await
        .expect("query");
    assert_eq!(revoked.rows.len(), 2);
}

#[tokio::test]
async fn commit_many_atomic_applies_mutation_set() {
    let dir = tempdir().expect("tempdir");
    let db = AedbInstance::open(AedbConfig::default(), dir.path()).expect("open");
    setup_users(&db).await;

    let result = db
        .commit_many_atomic(vec![
            Mutation::Upsert {
                project_id: "p".into(),
                scope_id: "app".into(),
                table_name: "users".into(),
                primary_key: vec![Value::Integer(10)],
                row: Row::from_values(vec![Value::Integer(10), Value::Text("active".into())]),
            },
            Mutation::Upsert {
                project_id: "p".into(),
                scope_id: "app".into(),
                table_name: "users".into(),
                primary_key: vec![Value::Integer(11)],
                row: Row::from_values(vec![Value::Integer(11), Value::Text("active".into())]),
            },
        ])
        .await
        .expect("commit_many_atomic");
    assert!(result.commit_seq > 0);

    let all = db
        .query(
            "p",
            "app",
            Query::select(&["id"])
                .from("users")
                .order_by("id", aedb::query::plan::Order::Asc)
                .limit(10),
        )
        .await
        .expect("query");
    assert_eq!(all.rows.len(), 2);
}

#[tokio::test]
async fn query_page_stable_uses_pk_order_for_cursor_paging() {
    let dir = tempdir().expect("tempdir");
    let db = AedbInstance::open(AedbConfig::default(), dir.path()).expect("open");
    setup_users(&db).await;

    for id in [3, 1, 2] {
        db.commit(Mutation::Upsert {
            project_id: "p".into(),
            scope_id: "app".into(),
            table_name: "users".into(),
            primary_key: vec![Value::Integer(id)],
            row: Row::from_values(vec![Value::Integer(id), Value::Text("active".into())]),
        })
        .await
        .expect("seed");
    }

    let page1 = db
        .query_page_stable(
            "p",
            "app",
            Query::select(&["id"]).from("users"),
            None,
            2,
            ConsistencyMode::AtLatest,
        )
        .await
        .expect("page1");
    assert_eq!(page1.rows.len(), 2);
    assert_eq!(page1.rows[0].values[0], Value::Integer(1));
    assert_eq!(page1.rows[1].values[0], Value::Integer(2));
    assert!(page1.cursor.is_some());

    let page2 = db
        .query_page_stable(
            "p",
            "app",
            Query::select(&["id"]).from("users"),
            page1.cursor.clone(),
            2,
            ConsistencyMode::AtLatest,
        )
        .await
        .expect("page2");
    assert_eq!(page2.rows.len(), 1);
    assert_eq!(page2.rows[0].values[0], Value::Integer(3));

    let mut tampered = page1.cursor.expect("cursor");
    let last = tampered.pop().expect("cursor char");
    tampered.push(if last == '0' { '1' } else { '0' });
    let err = db
        .query_page_stable(
            "p",
            "app",
            Query::select(&["id"]).from("users"),
            Some(tampered),
            2,
            ConsistencyMode::AtLatest,
        )
        .await
        .expect_err("tampered cursor must be rejected");
    assert!(
        err.to_string().contains("cursor"),
        "unexpected cursor error: {err}"
    );
}