alopex-server 0.5.0

Server component for Alopex DB
Documentation
use std::sync::Arc;
use std::time::{Duration, Instant};

use alopex_server::auth::AuthMode;
use alopex_server::config::ServerConfig;
use alopex_server::http;
use alopex_server::server::ServerState;
use alopex_server::Server;
use axum::body::Body;
use axum::http::{Method, Request, StatusCode};
use futures::future::join_all;
use serde_json::{json, Value};
use tempfile::tempdir;
use tokio::time::sleep;
use tower::ServiceExt;

async fn build_state(query_timeout: Duration) -> (Arc<ServerState>, tempfile::TempDir) {
    let temp = tempdir().expect("tempdir");
    let config = ServerConfig {
        data_dir: temp.path().to_path_buf(),
        auth_mode: AuthMode::None,
        query_timeout,
        audit_log_enabled: false,
        ..ServerConfig::default()
    };
    let server = Server::new(config).expect("server");
    (server.state, temp)
}

async fn send_json(router: axum::Router, path: &str, body: Value) -> (StatusCode, Vec<u8>) {
    let request = Request::builder()
        .method(Method::POST)
        .uri(path)
        .header(axum::http::header::CONTENT_TYPE, "application/json")
        .body(Body::from(body.to_string()))
        .expect("request");
    let response = router.oneshot(request).await.expect("response");
    let status = response.status();
    let body = hyper::body::to_bytes(response.into_body())
        .await
        .expect("body");
    (status, body.to_vec())
}

async fn send_get(router: axum::Router, path: &str) -> StatusCode {
    let request = Request::builder()
        .method(Method::GET)
        .uri(path)
        .body(Body::empty())
        .expect("request");
    let response = router.oneshot(request).await.expect("response");
    response.status()
}

fn parse_metric(metrics: &str, name: &str) -> Option<f64> {
    for line in metrics.lines() {
        if line.starts_with(name) {
            let mut parts = line.split_whitespace();
            let _ = parts.next();
            if let Some(value) = parts.next() {
                if let Ok(parsed) = value.parse::<f64>() {
                    return Some(parsed);
                }
            }
        }
    }
    None
}

#[tokio::test]
async fn load_test_concurrent_requests() {
    let (state, _temp) = build_state(Duration::from_secs(5)).await;
    let router = http::admin_router(state.clone());

    let request_count = 1000usize;
    let start = Instant::now();
    let futures = (0..request_count).map(|_| send_get(router.clone(), "/healthz"));
    let results = join_all(futures).await;
    let elapsed = start.elapsed();
    assert!(results.iter().all(|status| *status == StatusCode::OK));

    let min_ops = std::env::var("ALOPEX_LOAD_TEST_MIN_OPS")
        .ok()
        .and_then(|v| v.parse::<f64>().ok())
        .unwrap_or(50.0);
    let ops_per_sec = request_count as f64 / elapsed.as_secs_f64().max(0.000_001);
    assert!(
        ops_per_sec >= min_ops,
        "throughput too low: {ops_per_sec:.2} ops/sec < {min_ops:.2}"
    );

    let mut active_connections = None;
    for _ in 0..10 {
        let metrics = state.metrics.expose_prometheus().expect("metrics");
        active_connections = parse_metric(&metrics, "active_connections");
        if matches!(active_connections, Some(value) if value <= 0.0) {
            break;
        }
        sleep(Duration::from_millis(50)).await;
    }
    let active_connections = active_connections.expect("active_connections metric");
    assert!(
        active_connections <= 0.0,
        "active_connections should be 0 after load, got {active_connections}"
    );
}

#[tokio::test]
async fn load_test_backpressure_with_slow_client() {
    let (state, _temp) = build_state(Duration::from_secs(5)).await;
    let router = http::router(state.clone());

    let (status, _) = send_json(
        router.clone(),
        "/sql",
        json!({
            "sql": "CREATE TABLE items (id INT PRIMARY KEY, value TEXT);"
        }),
    )
    .await;
    assert_eq!(status, StatusCode::OK);

    let mut values = String::new();
    for id in 0..2000 {
        if !values.is_empty() {
            values.push_str(", ");
        }
        values.push_str(&format!("({}, 'v{}')", id, id));
    }
    let insert_sql = format!("INSERT INTO items (id, value) VALUES {values};");
    let (status, _) = send_json(router.clone(), "/sql", json!({ "sql": insert_sql })).await;
    assert_eq!(status, StatusCode::OK);

    let metrics_before = state.metrics.expose_prometheus().expect("metrics");
    let before = parse_metric(&metrics_before, "stream_backpressure").unwrap_or(0.0);

    let request = Request::builder()
        .method(Method::POST)
        .uri("/sql")
        .header(axum::http::header::CONTENT_TYPE, "application/json")
        .body(Body::from(
            json!({
                "sql": "SELECT id FROM items ORDER BY id;",
                "streaming": true
            })
            .to_string(),
        ))
        .expect("request");
    let response = router.oneshot(request).await.expect("response");
    assert_eq!(response.status(), StatusCode::OK);

    sleep(Duration::from_millis(200)).await;
    let metrics_after = state.metrics.expose_prometheus().expect("metrics");
    let after = parse_metric(&metrics_after, "stream_backpressure").unwrap_or(0.0);
    assert!(after > before);

    let _ = hyper::body::to_bytes(response.into_body())
        .await
        .expect("body");
}

#[tokio::test]
async fn load_test_timeout_cancels_stream() {
    let (state, _temp) = build_state(Duration::from_millis(0)).await;
    let router = http::router(state.clone());

    let (status, body) = send_json(
        router,
        "/sql",
        json!({
            "sql": "SELECT 1;",
            "streaming": true
        }),
    )
    .await;
    assert_eq!(status, StatusCode::OK);
    let text = String::from_utf8(body).expect("utf8");
    let mut saw_timeout = false;
    for line in text.lines().filter(|line| !line.trim().is_empty()) {
        let item: Value = serde_json::from_str(line).expect("json");
        if let Some(error) = item.get("error") {
            if error.get("code").and_then(|v| v.as_str()) == Some("QUERY_TIMEOUT") {
                saw_timeout = true;
                break;
            }
        }
    }
    assert!(saw_timeout);
}