iridium-db 0.2.0

A high-performance vector-graph hybrid storage and indexing engine
use super::*;

#[test]
fn execute_serialized_plan_sorts_desc_and_applies_limit() {
    let base = temp_dir("runtime_plexus_sort_limit");
    let mut handle = storage_api::open_store(storage_api::StorageConfig {
        buffer_pool_pages: 8,
        wal_dir: base.join("wal"),
        wal_segment_max_bytes: 1 << 20,
        manifest_path: base.join("ir.manifest"),
        sstable_dir: base.join("sst"),
    })
    .unwrap();

    for node_id in 1..=5 {
        storage_api::put_full_node(&mut handle, node_id, 1, &[node_id + 1]).unwrap();
    }
    storage_api::flush(&mut handle).unwrap();

    let plan = serialized_plan_bytes(
        (1, 0, 0),
        vec![
            scan_nodes(),
            Op::Sort {
                input: 0,
                keys: vec![0],
                dirs: vec![SortDir::Desc],
            },
            Op::Limit {
                input: 1,
                count: 2,
                skip: 0,
            },
            Op::Return { input: 2 },
        ],
        3,
    );
    let stream = execute_serialized_plan(
        &plan,
        &ExecuteParams {
            scan_start: 1,
            scan_end_exclusive: 6,
            morsel_size: 4,
            parallel_workers: 1,
        },
        &mut handle,
    )
    .unwrap();

    let ids: Vec<u64> = stream.rows.iter().map(|row| row.node_id).collect();
    assert_eq!(ids, vec![5, 4]);
}

#[test]
fn execute_serialized_plan_accepts_identity_project_exprs() {
    let base = temp_dir("runtime_plexus_identity_project");
    let mut handle = storage_api::open_store(storage_api::StorageConfig {
        buffer_pool_pages: 8,
        wal_dir: base.join("wal"),
        wal_segment_max_bytes: 1 << 20,
        manifest_path: base.join("ir.manifest"),
        sstable_dir: base.join("sst"),
    })
    .unwrap();

    storage_api::put_full_node(&mut handle, 1, 1, &[2]).unwrap();
    storage_api::put_full_node(&mut handle, 2, 1, &[3]).unwrap();
    storage_api::flush(&mut handle).unwrap();

    let plan = serialized_plan_bytes(
        (1, 0, 0),
        vec![
            scan_nodes(),
            Op::Project {
                input: 0,
                exprs: vec![Expr::ColRef { idx: 0 }],
                schema: Vec::new(),
            },
            Op::Return { input: 1 },
        ],
        2,
    );
    let stream = execute_serialized_plan(
        &plan,
        &ExecuteParams {
            scan_start: 1,
            scan_end_exclusive: 3,
            morsel_size: 4,
            parallel_workers: 1,
        },
        &mut handle,
    )
    .unwrap();

    let ids: Vec<u64> = stream.rows.iter().map(|row| row.node_id).collect();
    assert_eq!(ids, vec![1, 2]);
}

#[test]
fn execute_serialized_plan_accepts_project_with_supported_trailing_exprs() {
    let base = temp_dir("runtime_plexus_project_trailing_exprs");
    let mut handle = storage_api::open_store(storage_api::StorageConfig {
        buffer_pool_pages: 8,
        wal_dir: base.join("wal"),
        wal_segment_max_bytes: 1 << 20,
        manifest_path: base.join("ir.manifest"),
        sstable_dir: base.join("sst"),
    })
    .unwrap();

    storage_api::put_full_node(&mut handle, 1, 1, &[2]).unwrap();
    storage_api::put_full_node(&mut handle, 2, 1, &[3]).unwrap();
    storage_api::flush(&mut handle).unwrap();

    let plan = serialized_plan_bytes(
        (1, 0, 0),
        vec![
            scan_nodes(),
            Op::Project {
                input: 0,
                exprs: vec![
                    Expr::ColRef { idx: 0 },
                    Expr::PropAccess {
                        col: 0,
                        prop: "adjacency_degree".to_string(),
                    },
                    Expr::IntLiteral(42),
                    Expr::BoolLiteral(true),
                ],
                schema: Vec::new(),
            },
            Op::Return { input: 1 },
        ],
        2,
    );
    let stream = execute_serialized_plan(
        &plan,
        &ExecuteParams {
            scan_start: 1,
            scan_end_exclusive: 3,
            morsel_size: 4,
            parallel_workers: 1,
        },
        &mut handle,
    )
    .unwrap();

    let ids: Vec<u64> = stream.rows.iter().map(|row| row.node_id).collect();
    assert_eq!(ids, vec![1, 2]);
}

#[test]
fn execute_serialized_plan_rejects_project_without_leading_colref() {
    let base = temp_dir("runtime_plexus_project_lead_reject");
    let mut handle = storage_api::open_store(storage_api::StorageConfig {
        buffer_pool_pages: 8,
        wal_dir: base.join("wal"),
        wal_segment_max_bytes: 1 << 20,
        manifest_path: base.join("ir.manifest"),
        sstable_dir: base.join("sst"),
    })
    .unwrap();

    let plan = serialized_plan_bytes(
        (1, 0, 0),
        vec![
            scan_nodes(),
            Op::Project {
                input: 0,
                exprs: vec![Expr::PropAccess {
                    col: 0,
                    prop: "adjacency_degree".to_string(),
                }],
                schema: Vec::new(),
            },
            Op::Return { input: 1 },
        ],
        2,
    );
    let err = execute_serialized_plan(&plan, &ExecuteParams::default(), &mut handle).unwrap_err();

    match err {
        ExplainError::UnsupportedSerializedOperator(message) => {
            assert!(message.contains("first expression ColRef(idx=0)"));
        }
        other => panic!(
            "expected unsupported project-leading-expression error, got {:?}",
            other
        ),
    }
}

#[test]
fn execute_serialized_plan_rejects_project_with_unsupported_trailing_expr() {
    let base = temp_dir("runtime_plexus_project_trailing_reject");
    let mut handle = storage_api::open_store(storage_api::StorageConfig {
        buffer_pool_pages: 8,
        wal_dir: base.join("wal"),
        wal_segment_max_bytes: 1 << 20,
        manifest_path: base.join("ir.manifest"),
        sstable_dir: base.join("sst"),
    })
    .unwrap();

    let plan = serialized_plan_bytes(
        (1, 0, 0),
        vec![
            scan_nodes(),
            Op::Project {
                input: 0,
                exprs: vec![
                    Expr::ColRef { idx: 0 },
                    Expr::Cmp {
                        op: CmpOp::Eq,
                        lhs: Box::new(Expr::IntLiteral(1)),
                        rhs: Box::new(Expr::IntLiteral(1)),
                    },
                ],
                schema: Vec::new(),
            },
            Op::Return { input: 1 },
        ],
        2,
    );
    let err = execute_serialized_plan(&plan, &ExecuteParams::default(), &mut handle).unwrap_err();

    match err {
        ExplainError::UnsupportedSerializedOperator(message) => {
            assert!(message.contains("Project currently supports only"));
        }
        other => panic!(
            "expected unsupported project-trailing-expression error, got {:?}",
            other
        ),
    }
}