iridium-db 0.2.0

A high-performance vector-graph hybrid storage and indexing engine
use super::{execute_serialized_plan, temp_dir, ExecuteParams};
use crate::features::storage::api as storage_api;
use ::plexus_conformance::{run_conformance_suite, ConformanceCase, ConformanceSetup};
use ::plexus_engine::{QueryResult, Value};
use plexus_serde::{
    current_plan_version, serialize_plan, CmpOp, ColDef, ColKind, Expr, LogicalType, Op, SortDir,
};

fn scan_nodes() -> Op {
    Op::ScanNodes {
        labels: Vec::new(),
        schema: vec![ColDef {
            name: "n".to_string(),
            kind: ColKind::Node,
            logical_type: LogicalType::NodeRef,
        }],
        must_labels: Vec::new(),
        forbidden_labels: Vec::new(),
        est_rows: 64,
        selectivity: 1.0,
        graph_ref: None,
    }
}

fn serialize_ops(ops: Vec<Op>, root_op: u32) -> Vec<u8> {
    let plan = plexus_serde::Plan {
        version: current_plan_version("iridium-conformance-tests"),
        ops,
        root_op,
    };
    serialize_plan(&plan).expect("serialize conformance plan")
}

fn seeded_handle(prefix: &str) -> storage_api::StorageHandle {
    let base = temp_dir(prefix);
    let mut handle = storage_api::open_store(storage_api::StorageConfig {
        buffer_pool_pages: 8,
        wal_dir: base.join("wal"),
        wal_segment_max_bytes: 1 << 20,
        manifest_path: base.join("ir.manifest"),
        sstable_dir: base.join("sst"),
    })
    .expect("open store");
    storage_api::put_full_node(&mut handle, 1, 1, &[2]).expect("seed node 1");
    storage_api::put_full_node(&mut handle, 2, 1, &[3, 4]).expect("seed node 2");
    storage_api::put_full_node(&mut handle, 3, 1, &[4, 5, 6]).expect("seed node 3");
    storage_api::flush(&mut handle).expect("flush seed");
    handle
}

fn run_case(case: &ConformanceCase) -> std::result::Result<QueryResult, String> {
    let (plan, params) = match case.name.as_str() {
        "readonly_scan_limit" => (
            serialize_ops(
                vec![
                    scan_nodes(),
                    Op::Limit {
                        input: 0,
                        count: 2,
                        skip: 0,
                    },
                    Op::Return { input: 1 },
                ],
                2,
            ),
            ExecuteParams {
                scan_start: 1,
                scan_end_exclusive: 4,
                morsel_size: 8,
                parallel_workers: 1,
            },
        ),
        "readonly_union_sort_limit" => (
            serialize_ops(
                vec![
                    scan_nodes(),
                    Op::Filter {
                        input: 0,
                        predicate: Expr::Cmp {
                            op: CmpOp::Ge,
                            lhs: Box::new(Expr::PropAccess {
                                col: 0,
                                prop: "adjacency_degree".to_string(),
                            }),
                            rhs: Box::new(Expr::IntLiteral(2)),
                        },
                    },
                    Op::Filter {
                        input: 0,
                        predicate: Expr::Cmp {
                            op: CmpOp::Ge,
                            lhs: Box::new(Expr::PropAccess {
                                col: 0,
                                prop: "adjacency_degree".to_string(),
                            }),
                            rhs: Box::new(Expr::IntLiteral(1)),
                        },
                    },
                    Op::Union {
                        lhs: 1,
                        rhs: 2,
                        all: true,
                        schema: Vec::new(),
                    },
                    Op::Sort {
                        input: 3,
                        keys: vec![0],
                        dirs: vec![SortDir::Desc],
                    },
                    Op::Limit {
                        input: 4,
                        count: 2,
                        skip: 0,
                    },
                    Op::Return { input: 5 },
                ],
                6,
            ),
            ExecuteParams {
                scan_start: 1,
                scan_end_exclusive: 4,
                morsel_size: 8,
                parallel_workers: 1,
            },
        ),
        "unsupported_post_expand_filter" => (
            serialize_ops(
                vec![
                    scan_nodes(),
                    Op::Expand {
                        input: 0,
                        src_col: 0,
                        types: Vec::new(),
                        dir: plexus_serde::ExpandDir::Out,
                        schema: Vec::new(),
                        src_var: "n".to_string(),
                        rel_var: "r".to_string(),
                        dst_var: "m".to_string(),
                        legal_src_labels: Vec::new(),
                        legal_dst_labels: Vec::new(),
                        est_degree: 1.0,
                        graph_ref: None,
                    },
                    Op::Filter {
                        input: 1,
                        predicate: Expr::Cmp {
                            op: CmpOp::Ge,
                            lhs: Box::new(Expr::PropAccess {
                                col: 0,
                                prop: "adjacency_degree".to_string(),
                            }),
                            rhs: Box::new(Expr::IntLiteral(1)),
                        },
                    },
                    Op::Return { input: 2 },
                ],
                3,
            ),
            ExecuteParams::default(),
        ),
        "readonly_optional_expand_limit" => (
            serialize_ops(
                vec![
                    scan_nodes(),
                    Op::OptionalExpand {
                        input: 0,
                        src_col: 0,
                        types: Vec::new(),
                        dir: plexus_serde::ExpandDir::Out,
                        schema: Vec::new(),
                        src_var: "n".to_string(),
                        rel_var: "r".to_string(),
                        dst_var: "m".to_string(),
                        legal_src_labels: Vec::new(),
                        legal_dst_labels: Vec::new(),
                        graph_ref: None,
                    },
                    Op::Limit {
                        input: 1,
                        count: 2,
                        skip: 0,
                    },
                    Op::Return { input: 2 },
                ],
                3,
            ),
            ExecuteParams {
                scan_start: 1,
                scan_end_exclusive: 4,
                morsel_size: 8,
                parallel_workers: 1,
            },
        ),
        "readonly_grouped_count_star" => (
            serialize_ops(
                vec![
                    scan_nodes(),
                    Op::Aggregate {
                        input: 0,
                        keys: vec![0],
                        aggs: vec![Expr::Agg {
                            fn_: plexus_serde::AggFn::CountStar,
                            expr: None,
                        }],
                        schema: Vec::new(),
                    },
                    Op::Return { input: 1 },
                ],
                2,
            ),
            ExecuteParams {
                scan_start: 1,
                scan_end_exclusive: 4,
                morsel_size: 8,
                parallel_workers: 1,
            },
        ),
        other => return Err(format!("unknown conformance case '{}'", other)),
    };

    let mut handle = seeded_handle("runtime_plexus_conformance_subset");
    let out = execute_serialized_plan(&plan, &params, &mut handle)
        .map_err(|error| format!("{:?}", error))?;

    let rows = out
        .rows
        .into_iter()
        .map(|row| vec![Value::Int(row.node_id as i64)])
        .collect::<Vec<_>>();
    Ok(QueryResult { rows })
}

#[test]
fn plexus_conformance_subset_runner_reports_pass() {
    let cases = vec![
        ConformanceCase {
            name: "readonly_scan_limit".to_string(),
            query: "MATCH (n) RETURN n LIMIT 2".to_string(),
            plan_mlir: None,
            expected_rows: vec![vec![Value::Int(1)], vec![Value::Int(2)]],
            expected_error_contains: None,
            any_order: false,
            tags: vec![
                "readonly".to_string(),
                "match".to_string(),
                "return".to_string(),
            ],
            setup: ConformanceSetup::default(),
        },
        ConformanceCase {
            name: "readonly_union_sort_limit".to_string(),
            query: "MATCH (n) ... UNION ALL ... RETURN ...".to_string(),
            plan_mlir: None,
            expected_rows: vec![vec![Value::Int(3)], vec![Value::Int(3)]],
            expected_error_contains: None,
            any_order: false,
            tags: vec![
                "readonly".to_string(),
                "union".to_string(),
                "return".to_string(),
            ],
            setup: ConformanceSetup::default(),
        },
        ConformanceCase {
            name: "unsupported_post_expand_filter".to_string(),
            query: "MATCH (n)-->(m) WHERE ... RETURN m".to_string(),
            plan_mlir: None,
            expected_rows: Vec::new(),
            expected_error_contains: Some("post-expand filtering".to_string()),
            any_order: false,
            tags: vec![
                "readonly".to_string(),
                "optional".to_string(),
                "where".to_string(),
            ],
            setup: ConformanceSetup::default(),
        },
        ConformanceCase {
            name: "readonly_optional_expand_limit".to_string(),
            query: "MATCH (n) OPTIONAL MATCH (n)-->(m) RETURN n LIMIT 2".to_string(),
            plan_mlir: None,
            expected_rows: vec![vec![Value::Int(2)], vec![Value::Int(3)]],
            expected_error_contains: None,
            any_order: false,
            tags: vec![
                "readonly".to_string(),
                "optional".to_string(),
                "limit".to_string(),
            ],
            setup: ConformanceSetup::default(),
        },
        ConformanceCase {
            name: "readonly_grouped_count_star".to_string(),
            query: "MATCH (n) RETURN n, count(*)".to_string(),
            plan_mlir: None,
            expected_rows: vec![
                vec![Value::Int(1)],
                vec![Value::Int(2)],
                vec![Value::Int(3)],
            ],
            expected_error_contains: None,
            any_order: false,
            tags: vec![
                "readonly".to_string(),
                "aggregate".to_string(),
                "return".to_string(),
            ],
            setup: ConformanceSetup::default(),
        },
    ];

    let report = run_conformance_suite(&cases, run_case);
    assert_eq!(report.total, 5);
    assert_eq!(report.passed, 5);
    assert_eq!(report.failed, 0);
}