spg-engine 7.11.18

Execution engine for SPG: glues spg-sql parsing to spg-storage. Foreign keys, joins, vectors, cold tier.
Documentation
#![allow(
    clippy::cast_lossless,
    clippy::cast_possible_truncation,
    clippy::cast_precision_loss,
    clippy::cast_sign_loss,
    clippy::doc_markdown,
    clippy::similar_names,
    clippy::uninlined_format_args
)]

//! v6.2.7 — TPC-H Q1 – Q5 integration tests.
//!
//! The fixture is a deterministic micro-scale TPC-H subset
//! generated in-test. Tables hold ~100 rows each — enough to
//! exercise multi-table joins + aggregates + ORDER BY but small
//! enough to run inside a single second.
//!
//! Per V6_2_DESIGN.md L2 row 7 ship gates:
//!   - q1_matches_expected through q5_matches_expected
//!   - plan_stable_after_analyze (same EXPLAIN output across 5
//!     consecutive runs)

use spg_engine::{Engine, QueryResult};
use spg_storage::Value;

const N_NATIONS: usize = 5;
const N_REGIONS: usize = 2;
const N_CUSTOMERS: usize = 50;
const N_ORDERS: usize = 80;
const N_LINEITEMS: usize = 200;
const N_SUPPLIERS: usize = 20;

fn setup_tpch() -> Engine {
    let mut e = Engine::new();
    // ── DDL ────────────────────────────────────────────────────
    e.execute(
        "CREATE TABLE region (\
           r_regionkey INT NOT NULL, r_name TEXT NOT NULL)",
    )
    .unwrap();
    e.execute(
        "CREATE TABLE nation (\
           n_nationkey INT NOT NULL, n_name TEXT NOT NULL, n_regionkey INT NOT NULL)",
    )
    .unwrap();
    e.execute(
        "CREATE TABLE customer (\
           c_custkey INT NOT NULL, c_name TEXT NOT NULL, c_nationkey INT NOT NULL, \
           c_acctbal FLOAT NOT NULL, c_mktsegment TEXT NOT NULL)",
    )
    .unwrap();
    e.execute(
        "CREATE TABLE supplier (\
           s_suppkey INT NOT NULL, s_name TEXT NOT NULL, s_nationkey INT NOT NULL, \
           s_acctbal FLOAT NOT NULL)",
    )
    .unwrap();
    e.execute(
        "CREATE TABLE orders (\
           o_orderkey INT NOT NULL, o_custkey INT NOT NULL, o_orderdate TEXT NOT NULL, \
           o_orderpriority TEXT NOT NULL, o_totalprice FLOAT NOT NULL, o_orderstatus TEXT NOT NULL)",
    )
    .unwrap();
    e.execute(
        "CREATE TABLE lineitem (\
           l_orderkey INT NOT NULL, l_suppkey INT NOT NULL, \
           l_linenumber INT NOT NULL, l_quantity INT NOT NULL, \
           l_extendedprice FLOAT NOT NULL, l_discount FLOAT NOT NULL, \
           l_tax FLOAT NOT NULL, l_returnflag TEXT NOT NULL, \
           l_linestatus TEXT NOT NULL, l_shipdate TEXT NOT NULL)",
    )
    .unwrap();
    // ── DML ────────────────────────────────────────────────────
    for r in 0..N_REGIONS {
        e.execute(&format!("INSERT INTO region VALUES ({r}, 'REGION{r}')"))
            .unwrap();
    }
    for n in 0..N_NATIONS {
        e.execute(&format!(
            "INSERT INTO nation VALUES ({n}, 'NATION{n}', {})",
            n % N_REGIONS
        ))
        .unwrap();
    }
    for c in 0..N_CUSTOMERS {
        let seg = match c % 3 {
            0 => "BUILDING",
            1 => "AUTOMOBILE",
            _ => "MACHINERY",
        };
        e.execute(&format!(
            "INSERT INTO customer VALUES ({c}, 'CUST{c}', {}, {}, '{seg}')",
            c % N_NATIONS,
            (c as f64 * 17.0) % 1000.0
        ))
        .unwrap();
    }
    for s in 0..N_SUPPLIERS {
        e.execute(&format!(
            "INSERT INTO supplier VALUES ({s}, 'SUPP{s}', {}, {})",
            s % N_NATIONS,
            (s as f64 * 23.0) % 1000.0
        ))
        .unwrap();
    }
    for o in 0..N_ORDERS {
        let priority = match o % 5 {
            0 => "1-URGENT",
            1 => "2-HIGH",
            2 => "3-MEDIUM",
            3 => "4-NOT SPECIFIED",
            _ => "5-LOW",
        };
        let status = if o % 2 == 0 { "F" } else { "O" };
        // Spread dates across 1995-01-01 .. 1995-12-31.
        let day = (o % 365) + 1;
        let date = format!("1995-{:02}-{:02}", (day / 31).max(1), (day % 28).max(1));
        e.execute(&format!(
            "INSERT INTO orders VALUES ({o}, {}, '{date}', '{priority}', {}, '{status}')",
            o % N_CUSTOMERS,
            (o as f64 * 100.0) + 50.0
        ))
        .unwrap();
    }
    for l in 0..N_LINEITEMS {
        let order = l % N_ORDERS;
        let supplier = l % N_SUPPLIERS;
        let qty = (l % 50) as i32 + 1;
        let price = (l as f64 * 13.0) + 100.0;
        let discount = (l as f64 % 7.0) / 100.0;
        let tax = (l as f64 % 9.0) / 100.0;
        let returnflag = match l % 3 {
            0 => "A",
            1 => "N",
            _ => "R",
        };
        let linestatus = if l % 2 == 0 { "F" } else { "O" };
        let date = format!("1998-{:02}-{:02}", (l / 28 % 12).max(1), (l % 28).max(1));
        e.execute(&format!(
            "INSERT INTO lineitem VALUES \
             ({order}, {supplier}, {l}, {qty}, {price}, {discount}, {tax}, \
              '{returnflag}', '{linestatus}', '{date}')"
        ))
        .unwrap();
    }
    // ANALYZE — drives v6.2.3 JOIN reorder + v6.2.6 Memoize cap
    // decisions.
    e.execute("ANALYZE").unwrap();
    e
}

fn rows(r: QueryResult) -> Vec<Vec<Value>> {
    match r {
        QueryResult::Rows { rows, .. } => rows.into_iter().map(|r| r.values).collect(),
        _ => panic!("expected Rows"),
    }
}

#[test]
fn q1_pricing_summary_report() {
    // Q1: Pricing summary by (returnflag, linestatus).
    //   - GROUP BY 2 columns
    //   - SUM / AVG / COUNT aggregates
    //   - ORDER BY 2 columns
    let mut e = setup_tpch();
    // NB: SPG's ORDER BY supports one column at a time today.
    // The TPC-H Q1 spec is `ORDER BY l_returnflag, l_linestatus`;
    // we use the v6.2.7-shippable single-column equivalent, which
    // still exercises GROUP BY 2 columns + the 4 aggregates.
    let sql = "SELECT l_returnflag, l_linestatus, \
               SUM(l_quantity) AS sum_qty, \
               COUNT(*) AS count_order \
               FROM lineitem \
               GROUP BY l_returnflag, l_linestatus \
               ORDER BY l_returnflag";
    let result = e.execute(sql).expect("Q1 SELECT");
    let rs = rows(result);
    // 3 returnflags × 2 linestatuses = 6 groups max. Our fixture
    // covers all combinations.
    assert!(
        rs.len() >= 3 && rs.len() <= 6,
        "Q1 group count plausible; got {}",
        rs.len()
    );
    // Verify count_order column sums to N_LINEITEMS (every line
    // item lands in exactly one group).
    let total: i64 = rs
        .iter()
        .map(|r| match &r[3] {
            Value::Int(n) => i64::from(*n),
            Value::BigInt(n) => *n,
            other => panic!("expected int count, got {other:?}"),
        })
        .sum();
    assert_eq!(total, N_LINEITEMS as i64, "Q1 row preservation");
}

#[test]
fn q3_shipping_priority() {
    // Q3: top-N orders by revenue for a given customer segment.
    //   - 3-table JOIN (customer, orders, lineitem)
    //   - WHERE on date + segment
    //   - GROUP BY (orderkey, orderdate, shippriority)
    //   - ORDER BY revenue DESC LIMIT 10
    let mut e = setup_tpch();
    // SPG can't resolve SELECT-list aliases in ORDER BY today —
    // we use the full expression. Q3 spec uses `ORDER BY
    // revenue DESC, o_orderdate ASC`; SPG ORDER BY accepts one
    // column, so we sort by revenue.
    let sql = "SELECT orders.o_orderkey, \
               SUM(lineitem.l_extendedprice) AS revenue \
               FROM customer \
               INNER JOIN orders ON orders.o_custkey = customer.c_custkey \
               INNER JOIN lineitem ON lineitem.l_orderkey = orders.o_orderkey \
               WHERE customer.c_mktsegment = 'BUILDING' \
               GROUP BY orders.o_orderkey \
               ORDER BY SUM(lineitem.l_extendedprice) DESC \
               LIMIT 10";
    let result = e.execute(sql).expect("Q3 SELECT");
    let rs = rows(result);
    assert!(rs.len() <= 10, "Q3 LIMIT 10");
    assert!(!rs.is_empty(), "Q3 has matching BUILDING customers");
    // Revenue must be non-increasing.
    let mut prev = f64::INFINITY;
    for r in &rs {
        let rev = match &r[1] {
            Value::Float(x) => *x,
            other => panic!("expected Float revenue, got {other:?}"),
        };
        assert!(rev <= prev + 1e-6, "Q3 ORDER BY DESC: {rev} > prev {prev}");
        prev = rev;
    }
}

#[test]
fn q5_local_supplier_volume() {
    // Q5: revenue per nation where supplier nation = customer
    // nation (i.e., local sourcing). Exercises the v6.2.3 JOIN
    // reorder on a 5-way join (customer, orders, lineitem,
    // supplier, nation).
    let mut e = setup_tpch();
    let sql = "SELECT nation.n_name, \
               SUM(lineitem.l_extendedprice) AS revenue \
               FROM customer \
               INNER JOIN orders ON orders.o_custkey = customer.c_custkey \
               INNER JOIN lineitem ON lineitem.l_orderkey = orders.o_orderkey \
               INNER JOIN supplier ON supplier.s_suppkey = lineitem.l_suppkey \
               INNER JOIN nation ON nation.n_nationkey = supplier.s_nationkey AND nation.n_nationkey = customer.c_nationkey \
               GROUP BY nation.n_name \
               ORDER BY SUM(lineitem.l_extendedprice) DESC";
    let result = e.execute(sql).expect("Q5 SELECT");
    let rs = rows(result);
    // Up to N_NATIONS groups; could be fewer if some nations have
    // no matching supplier=customer rows in the small fixture.
    assert!(rs.len() <= N_NATIONS);
    let mut prev = f64::INFINITY;
    for r in &rs {
        let rev = match &r[1] {
            Value::Float(x) => *x,
            other => panic!("expected Float, got {other:?}"),
        };
        assert!(rev <= prev + 1e-6);
        prev = rev;
    }
}

#[test]
fn q2_minimum_cost_supplier_via_subquery() {
    // Q2 spec is "find supplier with minimum cost in Europe for
    // each part of given size+type". The full query joins 5
    // tables + a correlated subquery in the WHERE clause. SPG's
    // current SQL surface doesn't support PARTSUPP (we'd need a
    // 9th table) — the v6.2.7 micro-fixture skips that level.
    // We test the "Q2 shape" via a smaller equivalent: each
    // supplier's nation's region must be REGION0, and we pick
    // the supplier with the highest balance per nation.
    //
    // This exercises a 3-table join + GROUP BY + ORDER BY by
    // aggregate, in lieu of the partsupp correlated subquery
    // that v6.2.x's SQL surface doesn't yet support.
    let mut e = setup_tpch();
    let sql = "SELECT supplier.s_name, supplier.s_acctbal, nation.n_name \
               FROM supplier \
               INNER JOIN nation ON nation.n_nationkey = supplier.s_nationkey \
               INNER JOIN region ON region.r_regionkey = nation.n_regionkey \
               WHERE region.r_name = 'REGION0' \
               ORDER BY supplier.s_acctbal DESC \
               LIMIT 5";
    let result = e.execute(sql).expect("Q2-shape SELECT");
    let rs = rows(result);
    assert!(rs.len() <= 5);
    assert!(!rs.is_empty(), "Q2-shape has matching REGION0 suppliers");
}

#[test]
fn q4_order_priority_check_via_exists() {
    // Q4 spec: count orders by priority where at least one
    // lineitem has commit-date < receipt-date. SPG doesn't have
    // commitdate / receiptdate columns in our minimal fixture, so
    // we use the same shape with a simpler EXISTS-style predicate:
    // count orders that have at least one lineitem with quantity
    // ≥ 25.
    //
    // SPG's correlated-subquery path (v6.2.6 Memoize) lets this
    // run efficiently — many orders share the same predicate
    // shape, hitting the cache for distinct outer keys.
    let mut e = setup_tpch();
    let sql = "SELECT orders.o_orderpriority, COUNT(*) AS order_count \
               FROM orders \
               WHERE orders.o_orderkey IN \
                 (SELECT lineitem.l_orderkey FROM lineitem WHERE lineitem.l_quantity >= 25) \
               GROUP BY orders.o_orderpriority \
               ORDER BY orders.o_orderpriority";
    let result = e.execute(sql).expect("Q4-shape SELECT");
    let rs = rows(result);
    // At least one priority bucket present.
    assert!(!rs.is_empty(), "Q4-shape returns ≥ 1 priority group");
    let total: i64 = rs
        .iter()
        .map(|r| match &r[1] {
            Value::Int(n) => i64::from(*n),
            Value::BigInt(n) => *n,
            other => panic!("expected count, got {other:?}"),
        })
        .sum();
    assert!(
        total > 0 && total <= N_ORDERS as i64,
        "Q4-shape order count ≤ N_ORDERS: got {total}"
    );
}

#[test]
fn plan_stable_after_analyze() {
    // The same query, re-run 5 times after a single ANALYZE,
    // must produce identical EXPLAIN output every time. v6.2.3
    // reorder is deterministic given stable stats.
    let mut e = setup_tpch();
    let sql = "EXPLAIN SELECT orders.o_orderkey, customer.c_name \
               FROM customer \
               INNER JOIN orders ON orders.o_custkey = customer.c_custkey \
               INNER JOIN lineitem ON lineitem.l_orderkey = orders.o_orderkey \
               WHERE customer.c_mktsegment = 'BUILDING'";
    let first = rows(e.execute(sql).unwrap());
    for run in 1..5 {
        let nth = rows(e.execute(sql).unwrap());
        assert_eq!(
            first, nth,
            "plan changed between run 0 and run {run}\nfirst={first:?}\nnth={nth:?}"
        );
    }
}