aedb 0.1.10

Embedded Rust storage engine with transactional commits, WAL durability, and snapshot-consistent reads
Documentation
use aedb::AedbInstance;
use aedb::catalog::DdlOperation;
use aedb::catalog::schema::ColumnDef;
use aedb::catalog::types::{ColumnType, Row, Value};
use aedb::commit::executor::CommitResult;
use aedb::commit::validation::Mutation;
use aedb::config::AedbConfig;
use aedb::query::plan::{Expr, Order, Query};
use aedb::repository::{
    Page, PageRequest, RepositoryContext, RepositoryError, RowDecodeError, TryFromRow, bool_at,
    text_at, timestamp_at,
};
use tempfile::tempdir;

#[derive(Debug, Clone, PartialEq, Eq)]
struct Session {
    id: String,
    user_id: String,
    expires_at: i64,
    revoked: bool,
}

impl TryFromRow for Session {
    fn try_from_row(row: Row) -> Result<Self, RowDecodeError> {
        Ok(Self {
            id: text_at(&row, 0, "id")?.to_string(),
            user_id: text_at(&row, 1, "user_id")?.to_string(),
            expires_at: timestamp_at(&row, 2, "expires_at")?,
            revoked: bool_at(&row, 3, "revoked")?,
        })
    }
}

struct SessionRepo<'a> {
    ctx: RepositoryContext<'a>,
}

impl<'a> SessionRepo<'a> {
    fn new(ctx: RepositoryContext<'a>) -> Self {
        Self { ctx }
    }

    async fn upsert_session(
        &self,
        session: &Session,
    ) -> Result<CommitResult, aedb::error::AedbError> {
        self.ctx
            .commit_checked(Mutation::Upsert {
                project_id: self.ctx.project_id().to_string(),
                scope_id: self.ctx.scope_id().to_string(),
                table_name: "sessions".to_string(),
                primary_key: vec![Value::Text(session.id.clone().into())],
                row: Row::from_values(vec![
                    Value::Text(session.id.clone().into()),
                    Value::Text(session.user_id.clone().into()),
                    Value::Timestamp(session.expires_at),
                    Value::Boolean(session.revoked),
                ]),
            })
            .await
    }

    async fn list_active_for_user(
        &self,
        user_id: &str,
        now_ts: i64,
        page: PageRequest,
    ) -> Result<Page<Session>, RepositoryError> {
        let query = Query::select(&["id", "user_id", "expires_at", "revoked"])
            .from("sessions")
            .where_(Expr::And(
                Box::new(Expr::Eq(
                    "user_id".into(),
                    Value::Text(user_id.to_string().into()),
                )),
                Box::new(Expr::And(
                    Box::new(Expr::Eq("revoked".into(), Value::Boolean(false))),
                    Box::new(Expr::Gt("expires_at".into(), Value::Timestamp(now_ts))),
                )),
            ))
            .order_by("expires_at", Order::Asc)
            .order_by("id", Order::Asc);

        self.ctx.query_page(query, page).await
    }
}

#[tokio::test]
async fn example_session_repo_usage() {
    let dir = tempdir().expect("tempdir");
    let db = AedbInstance::open(AedbConfig::default(), dir.path()).expect("open");

    db.create_project("p").await.expect("create project");
    db.create_scope("p", "auth").await.expect("create scope");

    db.commit(Mutation::Ddl(DdlOperation::CreateTable {
        owner_id: None,
        project_id: "p".into(),
        scope_id: "auth".into(),
        table_name: "sessions".into(),
        if_not_exists: false,
        columns: vec![
            ColumnDef {
                name: "id".into(),
                col_type: ColumnType::Text,
                nullable: false,
            },
            ColumnDef {
                name: "user_id".into(),
                col_type: ColumnType::Text,
                nullable: false,
            },
            ColumnDef {
                name: "expires_at".into(),
                col_type: ColumnType::Timestamp,
                nullable: false,
            },
            ColumnDef {
                name: "revoked".into(),
                col_type: ColumnType::Boolean,
                nullable: false,
            },
        ],
        primary_key: vec!["id".into()],
    }))
    .await
    .expect("create table");

    let repo = SessionRepo::new(RepositoryContext::new(&db, "p", "auth"));
    repo.upsert_session(&Session {
        id: "s1".into(),
        user_id: "u1".into(),
        expires_at: 200,
        revoked: false,
    })
    .await
    .expect("insert s1");
    repo.upsert_session(&Session {
        id: "s2".into(),
        user_id: "u1".into(),
        expires_at: 300,
        revoked: false,
    })
    .await
    .expect("insert s2");
    repo.upsert_session(&Session {
        id: "s3".into(),
        user_id: "u1".into(),
        expires_at: 150,
        revoked: true,
    })
    .await
    .expect("insert revoked");

    let first = repo
        .list_active_for_user("u1", 100, PageRequest::new(1))
        .await
        .expect("first page");
    assert_eq!(first.items.len(), 1);
    assert_eq!(first.items[0].id, "s1");
    assert!(first.next_cursor.is_some());

    let second = repo
        .list_active_for_user(
            "u1",
            100,
            PageRequest::new(1).with_cursor(first.next_cursor.expect("cursor")),
        )
        .await
        .expect("second page");
    assert_eq!(second.items.len(), 1);
    assert_eq!(second.items[0].id, "s2");
    assert!(second.next_cursor.is_none());
}