duckdb-server 0.2.0

DuckDB Server for Mosaic.
Documentation
use anyhow::Result;
use arrow::{
    array::{Int32Array, RecordBatch},
    datatypes::{DataType, Field, Schema},
    ipc::reader::FileReader,
};
use axum::{
    body::Body,
    http::{self, Request, StatusCode},
};
use http_body_util::BodyExt;
use serde_json::json;
use std::sync::Arc;
use temp_testdir::TempDir;
use tokio::sync::Mutex;
use tower::ServiceExt;

use crate::bundle::{create, load, Query};
use crate::cache::get_key;
use crate::db::ConnectionPool;
use crate::interfaces::QueryParams;
use crate::interfaces::QueryResponse;
use crate::interfaces::{AppState, Command};
use crate::{app, query::handle};

#[test]
fn key() {
    let key = get_key("SELECT 1", &Command::Arrow);
    assert_eq!(
        key,
        "e004ebd5b5532a4b85984a62f8ad48a81aa3460c1ca07701f386135d72cdecf5.arrow"
    );
}

#[tokio::test]
async fn get_json() -> Result<()> {
    let db = ConnectionPool::new(":memory:", 1)?;
    let cache = lru::LruCache::new(10.try_into()?);

    let state = Arc::new(AppState {
        db: Box::new(db),
        cache: Mutex::new(cache),
    });

    let params = QueryParams {
        query_type: Some(Command::Json),
        sql: Some("SELECT 1 AS foo".to_string()),
        ..QueryParams::default()
    };

    let json = handle(&state, params).await.unwrap();

    if let QueryResponse::Json(json) = json {
        assert_eq!(json, "[{\"foo\":1}]");
    }

    Ok(())
}

#[tokio::test]
async fn get_arrow() -> Result<()> {
    let db = ConnectionPool::new(":memory:", 1)?;
    let cache = lru::LruCache::new(10.try_into()?);

    let state = Arc::new(AppState {
        db: Box::new(db),
        cache: Mutex::new(cache),
    });

    let params = QueryParams {
        query_type: Some(Command::Arrow),
        sql: Some("SELECT 1 AS foo".to_string()),
        ..QueryParams::default()
    };

    let arrow = handle(&state, params).await.unwrap();

    if let QueryResponse::Arrow(arrow) = arrow {
        let mut reader = FileReader::try_new(std::io::Cursor::new(arrow), None)?;
        let actual_batch = reader.next().unwrap();
        let actual_batch = actual_batch?;

        let schema = Arc::new(Schema::new(vec![Field::new("foo", DataType::Int32, true)]));
        let foo_values = Int32Array::from(vec![1]);
        let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(foo_values)])?;

        assert_eq!(actual_batch, batch);
    }

    Ok(())
}

#[tokio::test]
async fn select_1_get() -> Result<()> {
    let app = app::app(None, None, None)?;

    let response = app
        .oneshot(
            Request::builder()
                .uri("/?type=json&sql=SELECT%201%20as%20foo")
                .body(Body::empty())?,
        )
        .await?;

    assert_eq!(response.status(), StatusCode::OK);

    let body = response.into_body().collect().await?.to_bytes();
    assert_eq!(&body[..], b"[{\"foo\":1}]");

    Ok(())
}

#[tokio::test]
async fn select_1_post() -> Result<()> {
    let app = app::app(None, None, None)?;

    let response = app
        .oneshot(
            Request::builder()
                .method(http::Method::POST)
                .uri("/")
                .header(http::header::CONTENT_TYPE, "application/json")
                .body(Body::from(serde_json::to_vec(
                    &json!({"type": "json", "sql": "select 1 as foo"}),
                )?))?,
        )
        .await?;

    assert_eq!(response.status(), StatusCode::OK);

    let body = response.into_body().collect().await?.to_bytes();
    assert_eq!(&body[..], b"[{\"foo\":1}]");

    Ok(())
}

#[tokio::test]
async fn query_arrow() -> Result<()> {
    let app = app::app(None, None, None)?;

    let response = app
        .oneshot(
            Request::builder()
                .method(http::Method::POST)
                .uri("/")
                .header(http::header::CONTENT_TYPE, "application/json")
                .body(Body::from(serde_json::to_vec(
                    &json!({"type": "arrow", "sql": "select 1 as foo"}),
                )?))?,
        )
        .await?;

    assert_eq!(response.status(), StatusCode::OK);

    let body = response.into_body().collect().await?;

    let mut reader = FileReader::try_new(std::io::Cursor::new(body.to_bytes()), None)?;
    let actual_batch = reader.next().unwrap();
    let actual_batch = actual_batch?;

    let schema = Arc::new(Schema::new(vec![Field::new("foo", DataType::Int32, true)]));
    let foo_values = Int32Array::from(vec![1]);
    let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(foo_values)])?;

    assert_eq!(actual_batch, batch);

    Ok(())
}

#[tokio::test]
async fn create_and_load_bundle() -> Result<()> {
    let temp = TempDir::default();

    let db = ConnectionPool::new(":memory:", 1)?;
    let cache = &Mutex::new(lru::LruCache::new(10.try_into()?));

    let queries = vec![
        Query {sql: r#"CREATE TABLE IF NOT EXISTS flights AS SELECT * FROM read_parquet("data/flights-200k.parquet")"#.to_string(), alias: None},
        Query {sql: r#"SELECT count(*) FROM "flights""#.to_string(), alias: None},
    ];

    assert_eq!(cache.lock().await.len(), 0);

    create(&db, cache, queries, &temp).await?;

    assert_eq!(cache.lock().await.len(), 1);
    cache.lock().await.clear();

    load(&db, cache, &temp).await?;

    let cache = cache;
    assert_eq!(cache.lock().await.len(), 1);

    Ok(())
}