kip-sql 0.0.1-alpha.8

build the SQL layer of KipDB database
Documentation
use criterion::{criterion_group, criterion_main, Criterion};
use indicatif::{ProgressBar, ProgressStyle};
use itertools::Itertools;
use kip_sql::db::{Database, DatabaseError};
use kip_sql::execution::{codegen, volcano};
use kip_sql::storage::kip::KipStorage;
use kip_sql::storage::Storage;
use sqlite::Error;
use std::cell::RefCell;
use std::fs;
use std::sync::Arc;

const QUERY_CASE: &'static str = "select * from t1";
const QUERY_BENCH_KIPSQL_PATH: &'static str = "./kipsql_bench";
const QUERY_BENCH_SQLITE_PATH: &'static str = "./sqlite_bench";
const TABLE_ROW_NUM: u64 = 2_00_000;

async fn init_kipsql_query_bench() -> Result<(), DatabaseError> {
    let database = Database::with_kipdb(QUERY_BENCH_KIPSQL_PATH).await.unwrap();
    database
        .run("create table t1 (c1 int primary key, c2 int)")
        .await?;
    let pb = ProgressBar::new(TABLE_ROW_NUM);
    pb.set_style(
        ProgressStyle::default_bar()
            .template("[{elapsed_precise}] {bar:40.cyan/white} {pos}/{len} {msg}")
            .unwrap(),
    );

    for i in 0..TABLE_ROW_NUM {
        let _ = database
            .run(format!("insert into t1 values({}, {})", i, i + 1).as_str())
            .await?;
        pb.set_position(i + 1);
    }
    pb.finish_with_message("Insert completed!");

    Ok(())
}

fn init_sqlite_query_bench() -> Result<(), Error> {
    let connection = sqlite::open(QUERY_BENCH_SQLITE_PATH.to_owned() + "/data")?;

    let _ = connection.execute("create table t1 (c1 int primary key, c2 int)")?;

    let pb = ProgressBar::new(TABLE_ROW_NUM);
    pb.set_style(
        ProgressStyle::default_bar()
            .template("[{elapsed_precise}] {bar:40.cyan/white} {pos}/{len} {msg}")
            .unwrap(),
    );

    for i in 0..TABLE_ROW_NUM {
        let _ = connection.execute(format!("insert into t1 values({}, {})", i, i + 1))?;
        pb.set_position(i + 1);
    }
    pb.finish_with_message("Insert completed!");

    Ok(())
}

fn path_exists_and_is_directory(path: &str) -> bool {
    match fs::metadata(path) {
        Ok(metadata) => metadata.is_dir(),
        Err(_) => false,
    }
}

fn query_on_execute(c: &mut Criterion) {
    let rt = tokio::runtime::Builder::new_multi_thread()
        .worker_threads(8)
        .enable_all()
        .build()
        .unwrap();
    let database = rt.block_on(async {
        if !path_exists_and_is_directory(QUERY_BENCH_SQLITE_PATH) {
            println!(
                "SQLITE: The table is not initialized and data insertion is started. => {}",
                TABLE_ROW_NUM
            );

            init_sqlite_query_bench().unwrap();
        }
        if !path_exists_and_is_directory(QUERY_BENCH_KIPSQL_PATH) {
            println!(
                "KipSQL: The table is not initialized and data insertion is started. => {}",
                TABLE_ROW_NUM
            );

            init_kipsql_query_bench().await.unwrap();
        }

        Database::<KipStorage>::with_kipdb(QUERY_BENCH_KIPSQL_PATH)
            .await
            .unwrap()
    });

    println!("Table initialization completed");

    let (codegen_transaction, plan) = rt.block_on(async {
        let transaction = database.storage.transaction().await.unwrap();
        let (plan, _) = Database::<KipStorage>::build_plan(QUERY_CASE, &transaction).unwrap();

        (Arc::new(transaction), plan)
    });

    c.bench_function(format!("Codegen: {}", QUERY_CASE).as_str(), |b| {
        b.to_async(&rt).iter(|| async {
            let tuples = codegen::execute(plan.clone(), codegen_transaction.clone())
                .await
                .unwrap();
            if tuples.len() as u64 != TABLE_ROW_NUM {
                panic!("{}", tuples.len());
            }
        })
    });

    let (volcano_transaction, plan) = rt.block_on(async {
        let transaction = database.storage.transaction().await.unwrap();
        let (plan, _) = Database::<KipStorage>::build_plan(QUERY_CASE, &transaction).unwrap();

        (RefCell::new(transaction), plan)
    });

    c.bench_function(format!("Volcano: {}", QUERY_CASE).as_str(), |b| {
        b.to_async(&rt).iter(|| async {
            let mut stream = volcano::build_stream(plan.clone(), &volcano_transaction);
            let tuples = volcano::try_collect(&mut stream).await.unwrap();
            if tuples.len() as u64 != TABLE_ROW_NUM {
                panic!("{}", tuples.len());
            }
        })
    });

    c.bench_function(
        format!(
            "KipSQL: select * from t1 where c1 > {} limit 3000 offset 200",
            TABLE_ROW_NUM / 2
        )
        .as_str(),
        |b| {
            b.to_async(&rt).iter(|| async {
                let _tuples = database.run(QUERY_CASE).await.unwrap();
            })
        },
    );

    let connection = sqlite::open(QUERY_BENCH_SQLITE_PATH.to_owned() + "/data").unwrap();
    c.bench_function(
        format!(
            "SQLite: select * from t1 where c2 > {} limit 3000 offset 200",
            TABLE_ROW_NUM / 2
        )
        .as_str(),
        |b| {
            b.to_async(&rt).iter(|| async {
                let _tuples = connection
                    .prepare(QUERY_CASE)
                    .unwrap()
                    .into_iter()
                    .map(|row| row.unwrap())
                    .collect_vec();
            })
        },
    );
}

criterion_group!(
    name = query_benches;
    config = Criterion::default().sample_size(10);
    targets = query_on_execute
);

criterion_main!(query_benches,);