alopex-server 0.5.0

Server component for Alopex DB
Documentation
#![cfg(not(target_arch = "wasm32"))]

use std::sync::Arc;

use alopex_server::config::ServerConfig;
use alopex_server::http;
use alopex_server::server::ServerState;
use alopex_server::Server;
use axum::body::Body;
use axum::http::{Request, StatusCode};
use hyper::body::to_bytes;
use serde_json::Value;
use tower::ServiceExt;

fn test_state() -> (Arc<ServerState>, tempfile::TempDir) {
    let temp_dir = tempfile::tempdir().expect("temp dir");
    let config = ServerConfig {
        data_dir: temp_dir.path().join("data"),
        audit_log_enabled: false,
        tracing_enabled: false,
        metrics_enabled: false,
        ..ServerConfig::default()
    };
    let server = Server::new(config).expect("server");
    (server.state.clone(), temp_dir)
}

async fn send_json(app: &axum::Router, uri: &str, body: Value) -> (StatusCode, String) {
    let request = Request::builder()
        .method("POST")
        .uri(uri)
        .header("content-type", "application/json")
        .body(Body::from(body.to_string()))
        .expect("request");
    let response = app.clone().oneshot(request).await.expect("response");
    let status = response.status();
    let bytes = to_bytes(response.into_body()).await.expect("body");
    let body = String::from_utf8(bytes.to_vec()).expect("utf8");
    (status, body)
}

async fn send_empty(app: &axum::Router, uri: &str) -> (StatusCode, String) {
    let request = Request::builder()
        .method("POST")
        .uri(uri)
        .body(Body::empty())
        .expect("request");
    let response = app.clone().oneshot(request).await.expect("response");
    let status = response.status();
    let bytes = to_bytes(response.into_body()).await.expect("body");
    let body = String::from_utf8(bytes.to_vec()).expect("utf8");
    (status, body)
}

#[tokio::test]
async fn http_session_commit_and_rollback() {
    let (state, _temp_dir) = test_state();
    let app = http::router(state);

    let create = serde_json::json!({
        "sql": "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT NOT NULL)",
        "streaming": false
    });
    let (status, _) = send_json(&app, "/sql", create).await;
    assert_eq!(status, StatusCode::OK);

    let (status, body) = send_empty(&app, "/session/begin").await;
    assert_eq!(status, StatusCode::OK);
    let session = serde_json::from_str::<Value>(&body).expect("session");
    let session_id = session["session_id"].as_str().expect("session_id");

    let insert = serde_json::json!({
        "sql": "INSERT INTO users (id, name) VALUES (1, 'alice')",
        "session_id": session_id,
        "streaming": false
    });
    let (status, _) = send_json(&app, "/sql", insert).await;
    assert_eq!(status, StatusCode::OK);

    let (status, _) = send_empty(&app, &format!("/session/{session_id}/commit")).await;
    assert_eq!(status, StatusCode::OK);

    let select = serde_json::json!({
        "sql": "SELECT id, name FROM users",
        "streaming": false
    });
    let (status, body) = send_json(&app, "/sql", select).await;
    assert_eq!(status, StatusCode::OK);
    let response = serde_json::from_str::<Value>(&body).expect("select");
    let rows = response["rows"].as_array().expect("rows");
    assert_eq!(rows.len(), 1);

    let (status, body) = send_empty(&app, "/session/begin").await;
    assert_eq!(status, StatusCode::OK);
    let session = serde_json::from_str::<Value>(&body).expect("session");
    let rollback_id = session["session_id"].as_str().expect("session_id");

    let insert = serde_json::json!({
        "sql": "INSERT INTO users (id, name) VALUES (2, 'ghost')",
        "session_id": rollback_id,
        "streaming": false
    });
    let (status, _) = send_json(&app, "/sql", insert).await;
    assert_eq!(status, StatusCode::OK);

    let (status, _) = send_empty(&app, &format!("/session/{rollback_id}/rollback")).await;
    assert_eq!(status, StatusCode::OK);

    let select = serde_json::json!({
        "sql": "SELECT id, name FROM users",
        "streaming": false
    });
    let (status, body) = send_json(&app, "/sql", select).await;
    assert_eq!(status, StatusCode::OK);
    let response = serde_json::from_str::<Value>(&body).expect("select");
    let rows = response["rows"].as_array().expect("rows");
    assert_eq!(rows.len(), 1);
}

#[tokio::test]
async fn http_streaming_select_and_error_propagation() {
    let (state, _temp_dir) = test_state();
    let app = http::router(state);

    let create = serde_json::json!({
        "sql": "CREATE TABLE stream_users (id INTEGER PRIMARY KEY, name TEXT NOT NULL)",
        "streaming": false
    });
    let (status, _) = send_json(&app, "/sql", create).await;
    assert_eq!(status, StatusCode::OK);

    let insert = serde_json::json!({
        "sql": "INSERT INTO stream_users (id, name) VALUES (1, 'alpha')",
        "streaming": false
    });
    let (status, _) = send_json(&app, "/sql", insert).await;
    assert_eq!(status, StatusCode::OK);

    let stream = serde_json::json!({
        "sql": "SELECT id, name FROM stream_users",
        "streaming": true
    });
    let (status, body) = send_json(&app, "/sql", stream).await;
    assert_eq!(status, StatusCode::OK);
    let mut saw_row = false;
    let mut saw_done = false;
    for line in body.lines() {
        let item = serde_json::from_str::<Value>(line).expect("stream item");
        if item["done"].as_bool().unwrap_or(false) {
            saw_done = true;
        }
        if item["row"].is_array() {
            saw_row = true;
        }
    }
    assert!(saw_row);
    assert!(saw_done);

    let stream = serde_json::json!({
        "sql": "SELECT missing FROM stream_users",
        "streaming": true
    });
    let (status, body) = send_json(&app, "/sql", stream).await;
    assert_eq!(status, StatusCode::OK);
    let mut saw_error = false;
    for line in body.lines() {
        let item = serde_json::from_str::<Value>(line).expect("stream item");
        if !item["error"].is_null() {
            saw_error = true;
            break;
        }
    }
    assert!(saw_error);
}