citadeldb-sql 0.16.1

SQL parser, planner, and executor for Citadel encrypted database
Documentation
use super::*;
use crate::parser::{BinOp, Expr};
use crate::types::{Collation, ColumnDef, DataType, TableSchema, Value};

fn col(name: &str, dt: DataType) -> ColumnDef {
    ColumnDef {
        name: name.into(),
        data_type: dt,
        nullable: true,
        position: 0,
        default_expr: None,
        default_sql: None,
        check_expr: None,
        check_sql: None,
        check_name: None,
        is_with_timezone: false,
        generated_expr: None,
        generated_sql: None,
        generated_kind: None,
        collation: Collation::Binary,
    }
}

fn cols(specs: &[(&str, DataType)]) -> Vec<ColumnDef> {
    specs
        .iter()
        .enumerate()
        .map(|(i, (n, t))| {
            let mut c = col(n, *t);
            c.position = i as u16;
            c
        })
        .collect()
}

fn schema(name: &str, cs: Vec<ColumnDef>, pk: Vec<u16>) -> TableSchema {
    TableSchema::new(name.into(), cs, pk, vec![], vec![], vec![])
}

fn i(n: i64) -> Value {
    Value::Integer(n)
}

#[test]
fn resolves_in_lowercases_input() {
    let ts = schema(
        "t",
        cols(&[("name", DataType::Text), ("id", DataType::Integer)]),
        vec![1],
    );
    assert!(resolves_in("name", &ts));
    assert!(resolves_in("NAME", &ts));
    assert!(resolves_in("id", &ts));
}

#[test]
fn resolves_in_unknown_column_false() {
    let ts = schema("t", cols(&[("id", DataType::Integer)]), vec![0]);
    assert!(!resolves_in("missing", &ts));
}

#[test]
fn col_name_lower_column() {
    assert_eq!(col_name_lower(&Expr::Column("X".into())), Some("x".into()));
}

#[test]
fn col_name_lower_qualified() {
    assert_eq!(
        col_name_lower(&Expr::QualifiedColumn {
            table: "T".into(),
            column: "Col".into(),
        }),
        Some("col".into())
    );
}

#[test]
fn col_name_lower_non_column_returns_none() {
    assert_eq!(col_name_lower(&Expr::Literal(i(1))), None);
}

#[test]
fn collect_column_names_single_column() {
    let mut out = Vec::new();
    collect_column_names(&Expr::Column("X".into()), &mut out);
    assert_eq!(out, vec!["x"]);
}

#[test]
fn collect_column_names_qualified_lowercase() {
    let mut out = Vec::new();
    collect_column_names(
        &Expr::QualifiedColumn {
            table: "T".into(),
            column: "Col".into(),
        },
        &mut out,
    );
    assert_eq!(out, vec!["t.col"]);
}

#[test]
fn collect_column_names_binary_op_collects_both_sides() {
    let mut out = Vec::new();
    let e = Expr::BinaryOp {
        left: Box::new(Expr::Column("a".into())),
        op: BinOp::Eq,
        right: Box::new(Expr::Column("b".into())),
    };
    collect_column_names(&e, &mut out);
    assert_eq!(out, vec!["a", "b"]);
}

#[test]
fn collect_column_names_literal_yields_empty() {
    let mut out = Vec::new();
    collect_column_names(&Expr::Literal(i(1)), &mut out);
    assert!(out.is_empty());
}

#[test]
fn collect_column_names_function_args() {
    let mut out = Vec::new();
    let e = Expr::Function {
        name: "ABS".into(),
        args: vec![Expr::Column("x".into())],
        distinct: false,
    };
    collect_column_names(&e, &mut out);
    assert_eq!(out, vec!["x"]);
}

#[test]
fn collect_column_names_coalesce_collects_all() {
    let mut out = Vec::new();
    let e = Expr::Coalesce(vec![
        Expr::Column("a".into()),
        Expr::Column("b".into()),
        Expr::Column("c".into()),
    ]);
    collect_column_names(&e, &mut out);
    assert_eq!(out, vec!["a", "b", "c"]);
}

#[test]
fn collect_column_names_case_branches() {
    let mut out = Vec::new();
    let e = Expr::Case {
        operand: Some(Box::new(Expr::Column("op".into()))),
        conditions: vec![(Expr::Column("c".into()), Expr::Column("r".into()))],
        else_result: Some(Box::new(Expr::Column("el".into()))),
    };
    collect_column_names(&e, &mut out);
    assert_eq!(out, vec!["op", "c", "r", "el"]);
}

#[test]
fn collect_column_names_between() {
    let mut out = Vec::new();
    let e = Expr::Between {
        expr: Box::new(Expr::Column("x".into())),
        low: Box::new(Expr::Column("lo".into())),
        high: Box::new(Expr::Column("hi".into())),
        negated: false,
    };
    collect_column_names(&e, &mut out);
    assert_eq!(out, vec!["x", "lo", "hi"]);
}

#[test]
fn collect_column_names_unary_and_isnull() {
    let mut out = Vec::new();
    let inner = Expr::Column("x".into());
    collect_column_names(&Expr::IsNull(Box::new(inner.clone())), &mut out);
    collect_column_names(&Expr::IsNotNull(Box::new(inner)), &mut out);
    assert_eq!(out, vec!["x", "x"]);
}

#[test]
fn flatten_and_exprs_no_and_returns_single() {
    let e = Expr::Literal(i(1));
    let v = flatten_and_exprs(&e);
    assert_eq!(v.len(), 1);
}

#[test]
fn flatten_and_exprs_chained_and_flattens() {
    let inner = Expr::BinaryOp {
        left: Box::new(Expr::Column("a".into())),
        op: BinOp::And,
        right: Box::new(Expr::Column("b".into())),
    };
    let outer = Expr::BinaryOp {
        left: Box::new(inner),
        op: BinOp::And,
        right: Box::new(Expr::Column("c".into())),
    };
    let v = flatten_and_exprs(&outer);
    assert_eq!(v.len(), 3);
}

#[test]
fn flatten_and_exprs_or_does_not_flatten() {
    let e = Expr::BinaryOp {
        left: Box::new(Expr::Column("a".into())),
        op: BinOp::Or,
        right: Box::new(Expr::Column("b".into())),
    };
    let v = flatten_and_exprs(&e);
    assert_eq!(v.len(), 1);
}

#[test]
fn has_correlated_in_expr_with_scalar_subquery() {
    let outer = schema("o", cols(&[("x", DataType::Integer)]), vec![]);
    let ctx = CorrelationCtx {
        outer_schema: &outer,
        outer_alias: None,
    };
    let mgr = crate::schema::SchemaManager::empty();
    let e = Expr::Literal(i(0));
    assert!(!has_correlated_in_expr(&e, &ctx, &mgr));
}

#[test]
fn has_correlated_in_expr_binary_op_propagates() {
    let outer = schema("o", cols(&[("x", DataType::Integer)]), vec![]);
    let ctx = CorrelationCtx {
        outer_schema: &outer,
        outer_alias: None,
    };
    let mgr = crate::schema::SchemaManager::empty();
    let e = Expr::BinaryOp {
        left: Box::new(Expr::Column("a".into())),
        op: BinOp::Eq,
        right: Box::new(Expr::Literal(i(1))),
    };
    assert!(!has_correlated_in_expr(&e, &ctx, &mgr));
}

#[test]
fn has_correlated_select_no_subquery_in_columns() {
    let outer = schema("o", cols(&[("x", DataType::Integer)]), vec![]);
    let ctx = CorrelationCtx {
        outer_schema: &outer,
        outer_alias: None,
    };
    let mgr = crate::schema::SchemaManager::empty();
    let columns = vec![crate::parser::SelectColumn::Expr {
        expr: Expr::Column("a".into()),
        alias: None,
    }];
    assert!(!has_correlated_select(&columns, &ctx, &mgr));
}

#[test]
fn has_correlated_where_no_where_clause() {
    let outer = schema("o", cols(&[("x", DataType::Integer)]), vec![]);
    let ctx = CorrelationCtx {
        outer_schema: &outer,
        outer_alias: None,
    };
    let mgr = crate::schema::SchemaManager::empty();
    assert!(!has_correlated_where(&None, &ctx, &mgr));
}