alopex-server 0.4.1

Server component for Alopex DB
Documentation
#![cfg(not(target_arch = "wasm32"))]

use std::future::poll_fn;
use std::sync::{Arc, RwLock};

use alopex_core::kv::memory::MemoryKV;
use alopex_sql::ast::expr::Literal;
use alopex_sql::catalog::{ColumnMetadata, PersistentCatalog, TableMetadata};
use alopex_sql::executor::{ExecutionResult, Executor, ExecutorError};
use alopex_sql::planner::typed_expr::{ProjectedColumn, TypedExpr};
use alopex_sql::planner::types::ResolvedType;
use alopex_sql::{
    AsyncResult, AsyncRowStream, AsyncSqlTransaction, AsyncTxnBridge, LogicalPlan, Projection,
    Span, SqlValue, TokioAsyncTxnBridge,
};

async fn next_stream_item(
    stream: &mut AsyncRowStream<'_>,
) -> Option<AsyncResult<alopex_sql::executor::Row>> {
    poll_fn(|cx| futures_core::Stream::poll_next(stream.as_mut(), cx)).await
}

async fn collect_ok_rows(stream: &mut AsyncRowStream<'_>) -> AsyncResult<Vec<Vec<SqlValue>>> {
    let mut rows = Vec::new();
    while let Some(item) = next_stream_item(stream).await {
        let row = item?;
        rows.push(row.values);
    }
    Ok(rows)
}

fn sample_table(name: &str) -> TableMetadata {
    TableMetadata::new(
        name,
        vec![
            ColumnMetadata::new("id", ResolvedType::Integer)
                .with_primary_key(true)
                .with_not_null(true),
            ColumnMetadata::new("name", ResolvedType::Text).with_not_null(true),
        ],
    )
    .with_primary_key(vec!["id".into()])
}

fn insert_row(table: &str, id: i64, name: &str) -> LogicalPlan {
    LogicalPlan::Insert {
        table: table.into(),
        columns: vec!["id".into(), "name".into()],
        values: vec![vec![
            TypedExpr::literal(
                Literal::Number(id.to_string()),
                ResolvedType::Integer,
                Span::default(),
            ),
            TypedExpr::literal(
                Literal::String(name.to_string()),
                ResolvedType::Text,
                Span::default(),
            ),
        ]],
    }
}

#[tokio::test]
async fn async_txn_commit_and_rollback_integrate() {
    let store = Arc::new(MemoryKV::new());
    let catalog = Arc::new(RwLock::new(PersistentCatalog::new(store.clone())));
    let bridge = TokioAsyncTxnBridge::new(store.clone(), catalog.clone());

    let mut txn = bridge.begin_write().await.unwrap();
    let create = LogicalPlan::CreateTable {
        table: sample_table("sessions"),
        if_not_exists: false,
        with_options: vec![],
    };
    txn.execute_plan(create).await.unwrap();
    txn.execute_plan(insert_row("sessions", 1, "alpha"))
        .await
        .unwrap();
    txn.commit().await.unwrap();

    let mut executor = Executor::new(store.clone(), catalog.clone());
    let result = executor
        .execute(LogicalPlan::Scan {
            table: "sessions".into(),
            projection: Projection::All(vec!["id".into(), "name".into()]),
        })
        .unwrap();
    let ExecutionResult::Query(query) = result else {
        panic!("expected query result");
    };
    assert_eq!(
        query.rows,
        vec![vec![SqlValue::Integer(1), SqlValue::Text("alpha".into())]]
    );

    let mut txn = bridge.begin_write().await.unwrap();
    let create = LogicalPlan::CreateTable {
        table: sample_table("rollbacked"),
        if_not_exists: false,
        with_options: vec![],
    };
    txn.execute_plan(create).await.unwrap();
    txn.rollback().await.unwrap();

    let err = executor.execute(LogicalPlan::Scan {
        table: "rollbacked".into(),
        projection: Projection::All(vec!["id".into(), "name".into()]),
    });
    assert!(matches!(err, Err(ExecutorError::TableNotFound(_))));
}

#[tokio::test]
async fn async_streaming_matches_sync_results() {
    let store = Arc::new(MemoryKV::new());
    let catalog = Arc::new(RwLock::new(PersistentCatalog::new(store.clone())));

    let mut executor = Executor::new(store.clone(), catalog.clone());
    executor
        .execute(LogicalPlan::CreateTable {
            table: sample_table("users"),
            if_not_exists: false,
            with_options: vec![],
        })
        .unwrap();
    executor.execute(insert_row("users", 1, "alice")).unwrap();
    executor.execute(insert_row("users", 2, "bob")).unwrap();

    let sync_result = executor
        .execute(LogicalPlan::Scan {
            table: "users".into(),
            projection: Projection::All(vec!["id".into(), "name".into()]),
        })
        .unwrap();
    let ExecutionResult::Query(sync_query) = sync_result else {
        panic!("expected query result");
    };

    let bridge = TokioAsyncTxnBridge::new(store, catalog);
    let mut txn = bridge.begin_read().await.unwrap();
    let mut stream = txn
        .query_stream(LogicalPlan::Scan {
            table: "users".into(),
            projection: Projection::All(vec!["id".into(), "name".into()]),
        })
        .await
        .unwrap();
    let async_rows = collect_ok_rows(&mut stream).await.unwrap();
    assert_eq!(async_rows, sync_query.rows);
    drop(stream);
    txn.commit().await.unwrap();
}

#[tokio::test]
async fn async_streaming_propagates_errors() {
    let store = Arc::new(MemoryKV::new());
    let catalog = Arc::new(RwLock::new(PersistentCatalog::new(store.clone())));

    let mut executor = Executor::new(store.clone(), catalog.clone());
    executor
        .execute(LogicalPlan::CreateTable {
            table: sample_table("items"),
            if_not_exists: false,
            with_options: vec![],
        })
        .unwrap();
    executor.execute(insert_row("items", 1, "first")).unwrap();

    let invalid_projection = Projection::Columns(vec![ProjectedColumn {
        expr: TypedExpr::column_ref(
            "items".to_string(),
            "missing".to_string(),
            9,
            ResolvedType::Integer,
            Span::default(),
        ),
        alias: None,
    }]);

    let bridge = TokioAsyncTxnBridge::new(store, catalog);
    let mut txn = bridge.begin_read().await.unwrap();
    let mut stream = txn
        .query_stream(LogicalPlan::Scan {
            table: "items".into(),
            projection: invalid_projection,
        })
        .await
        .unwrap();
    let first = next_stream_item(&mut stream).await.unwrap();
    assert!(first.is_err());
    drop(stream);
    txn.commit().await.unwrap();
}