iridium-db 0.2.0

A high-performance vector-graph hybrid storage and indexing engine
use super::*;

#[test]
fn execute_serialized_plan_executes_one_hop_expand_out() {
    let base = temp_dir("runtime_plexus_expand_out");
    let mut handle = storage_api::open_store(storage_api::StorageConfig {
        buffer_pool_pages: 8,
        wal_dir: base.join("wal"),
        wal_segment_max_bytes: 1 << 20,
        manifest_path: base.join("ir.manifest"),
        sstable_dir: base.join("sst"),
    })
    .unwrap();

    storage_api::put_full_node(&mut handle, 1, 1, &[2, 3]).unwrap();
    storage_api::put_full_node(&mut handle, 2, 1, &[3]).unwrap();
    storage_api::put_full_node(&mut handle, 3, 1, &[]).unwrap();
    storage_api::flush(&mut handle).unwrap();

    let plan = serialized_plan_bytes(
        (1, 0, 0),
        vec![
            scan_nodes(),
            Op::Expand {
                input: 0,
                src_col: 0,
                types: Vec::new(),
                dir: ExpandDir::Out,
                schema: Vec::new(),
                src_var: "n".to_string(),
                rel_var: "r".to_string(),
                dst_var: "m".to_string(),
                legal_src_labels: Vec::new(),
                legal_dst_labels: Vec::new(),
                est_degree: 1.0,
                graph_ref: None,
            },
            Op::Return { input: 1 },
        ],
        2,
    );
    let stream = execute_serialized_plan(
        &plan,
        &ExecuteParams {
            scan_start: 1,
            scan_end_exclusive: 4,
            morsel_size: 4,
            parallel_workers: 1,
        },
        &mut handle,
    )
    .unwrap();

    let ids: Vec<u64> = stream.rows.iter().map(|row| row.node_id).collect();
    assert_eq!(ids, vec![2, 3, 3]);
}

#[test]
fn execute_serialized_plan_executes_filter_before_expand() {
    let base = temp_dir("runtime_plexus_filter_before_expand");
    let mut handle = storage_api::open_store(storage_api::StorageConfig {
        buffer_pool_pages: 8,
        wal_dir: base.join("wal"),
        wal_segment_max_bytes: 1 << 20,
        manifest_path: base.join("ir.manifest"),
        sstable_dir: base.join("sst"),
    })
    .unwrap();

    storage_api::put_full_node(&mut handle, 1, 1, &[2]).unwrap();
    storage_api::put_full_node(&mut handle, 2, 1, &[3, 4]).unwrap();
    storage_api::put_full_node(&mut handle, 3, 1, &[]).unwrap();
    storage_api::put_full_node(&mut handle, 4, 1, &[]).unwrap();
    storage_api::flush(&mut handle).unwrap();

    let plan = serialized_plan_bytes(
        (1, 0, 0),
        vec![
            scan_nodes(),
            Op::Filter {
                input: 0,
                predicate: Expr::Cmp {
                    op: CmpOp::Ge,
                    lhs: Box::new(Expr::PropAccess {
                        col: 0,
                        prop: "adjacency_degree".to_string(),
                    }),
                    rhs: Box::new(Expr::IntLiteral(2)),
                },
            },
            Op::Expand {
                input: 1,
                src_col: 0,
                types: Vec::new(),
                dir: ExpandDir::Out,
                schema: Vec::new(),
                src_var: "n".to_string(),
                rel_var: "r".to_string(),
                dst_var: "m".to_string(),
                legal_src_labels: Vec::new(),
                legal_dst_labels: Vec::new(),
                est_degree: 1.0,
                graph_ref: None,
            },
            Op::Return { input: 2 },
        ],
        3,
    );
    let stream = execute_serialized_plan(
        &plan,
        &ExecuteParams {
            scan_start: 1,
            scan_end_exclusive: 5,
            morsel_size: 4,
            parallel_workers: 1,
        },
        &mut handle,
    )
    .unwrap();

    let ids: Vec<u64> = stream.rows.iter().map(|row| row.node_id).collect();
    assert_eq!(ids, vec![3, 4]);
}

#[test]
fn execute_serialized_plan_rejects_filter_after_expand() {
    let base = temp_dir("runtime_plexus_filter_after_expand_rejected");
    let mut handle = storage_api::open_store(storage_api::StorageConfig {
        buffer_pool_pages: 8,
        wal_dir: base.join("wal"),
        wal_segment_max_bytes: 1 << 20,
        manifest_path: base.join("ir.manifest"),
        sstable_dir: base.join("sst"),
    })
    .unwrap();

    storage_api::put_full_node(&mut handle, 1, 1, &[2, 3]).unwrap();
    storage_api::put_full_node(&mut handle, 2, 1, &[]).unwrap();
    storage_api::put_full_node(&mut handle, 3, 1, &[]).unwrap();
    storage_api::flush(&mut handle).unwrap();

    let plan = serialized_plan_bytes(
        (1, 0, 0),
        vec![
            scan_nodes(),
            Op::Expand {
                input: 0,
                src_col: 0,
                types: Vec::new(),
                dir: ExpandDir::Out,
                schema: Vec::new(),
                src_var: "n".to_string(),
                rel_var: "r".to_string(),
                dst_var: "m".to_string(),
                legal_src_labels: Vec::new(),
                legal_dst_labels: Vec::new(),
                est_degree: 1.0,
                graph_ref: None,
            },
            Op::Filter {
                input: 1,
                predicate: Expr::Cmp {
                    op: CmpOp::Ge,
                    lhs: Box::new(Expr::PropAccess {
                        col: 0,
                        prop: "adjacency_degree".to_string(),
                    }),
                    rhs: Box::new(Expr::IntLiteral(1)),
                },
            },
            Op::Return { input: 2 },
        ],
        3,
    );
    let err = execute_serialized_plan(&plan, &ExecuteParams::default(), &mut handle).unwrap_err();

    match err {
        ExplainError::UnsupportedSerializedOperator(message) => {
            assert!(message.contains("post-expand filtering"));
        }
        other => panic!(
            "expected unsupported filter-after-expand error, got {:?}",
            other
        ),
    }
}

#[test]
fn execute_serialized_plan_rejects_expand_direction_in() {
    let base = temp_dir("runtime_plexus_expand_in_rejected");
    let mut handle = storage_api::open_store(storage_api::StorageConfig {
        buffer_pool_pages: 8,
        wal_dir: base.join("wal"),
        wal_segment_max_bytes: 1 << 20,
        manifest_path: base.join("ir.manifest"),
        sstable_dir: base.join("sst"),
    })
    .unwrap();

    let plan = serialized_plan_bytes(
        (1, 0, 0),
        vec![
            scan_nodes(),
            Op::Expand {
                input: 0,
                src_col: 0,
                types: Vec::new(),
                dir: ExpandDir::In,
                schema: Vec::new(),
                src_var: "n".to_string(),
                rel_var: "r".to_string(),
                dst_var: "m".to_string(),
                legal_src_labels: Vec::new(),
                legal_dst_labels: Vec::new(),
                est_degree: 1.0,
                graph_ref: None,
            },
            Op::Return { input: 1 },
        ],
        2,
    );
    let err = execute_serialized_plan(&plan, &ExecuteParams::default(), &mut handle).unwrap_err();

    match err {
        ExplainError::UnsupportedSerializedOperator(message) => {
            assert!(message.contains("Out direction"));
        }
        other => panic!(
            "expected unsupported expand direction error, got {:?}",
            other
        ),
    }
}

#[test]
fn execute_serialized_plan_executes_optional_expand_out_with_fallback_rows() {
    let base = temp_dir("runtime_plexus_optional_expand_out");
    let mut handle = storage_api::open_store(storage_api::StorageConfig {
        buffer_pool_pages: 8,
        wal_dir: base.join("wal"),
        wal_segment_max_bytes: 1 << 20,
        manifest_path: base.join("ir.manifest"),
        sstable_dir: base.join("sst"),
    })
    .unwrap();

    storage_api::put_full_node(&mut handle, 1, 1, &[2]).unwrap();
    storage_api::put_full_node(&mut handle, 2, 1, &[]).unwrap();
    storage_api::put_full_node(&mut handle, 3, 1, &[]).unwrap();
    storage_api::flush(&mut handle).unwrap();

    let plan = serialized_plan_bytes(
        (1, 0, 0),
        vec![
            scan_nodes(),
            Op::OptionalExpand {
                input: 0,
                src_col: 0,
                types: Vec::new(),
                dir: ExpandDir::Out,
                schema: Vec::new(),
                src_var: "n".to_string(),
                rel_var: "r".to_string(),
                dst_var: "m".to_string(),
                legal_src_labels: Vec::new(),
                legal_dst_labels: Vec::new(),
                graph_ref: None,
            },
            Op::Return { input: 1 },
        ],
        2,
    );
    let stream = execute_serialized_plan(
        &plan,
        &ExecuteParams {
            scan_start: 1,
            scan_end_exclusive: 4,
            morsel_size: 4,
            parallel_workers: 1,
        },
        &mut handle,
    )
    .unwrap();

    let ids: Vec<u64> = stream.rows.iter().map(|row| row.node_id).collect();
    assert_eq!(ids, vec![2, 2, 3]);
}

#[test]
fn execute_serialized_plan_executes_semi_expand_out() {
    let base = temp_dir("runtime_plexus_semi_expand_out");
    let mut handle = storage_api::open_store(storage_api::StorageConfig {
        buffer_pool_pages: 8,
        wal_dir: base.join("wal"),
        wal_segment_max_bytes: 1 << 20,
        manifest_path: base.join("ir.manifest"),
        sstable_dir: base.join("sst"),
    })
    .unwrap();

    storage_api::put_full_node(&mut handle, 1, 1, &[2]).unwrap();
    storage_api::put_full_node(&mut handle, 2, 1, &[]).unwrap();
    storage_api::put_full_node(&mut handle, 3, 1, &[1]).unwrap();
    storage_api::flush(&mut handle).unwrap();

    let plan = serialized_plan_bytes(
        (1, 0, 0),
        vec![
            scan_nodes(),
            Op::SemiExpand {
                input: 0,
                src_col: 0,
                types: Vec::new(),
                dir: ExpandDir::Out,
                schema: Vec::new(),
                legal_src_labels: Vec::new(),
                legal_dst_labels: Vec::new(),
            },
            Op::Return { input: 1 },
        ],
        2,
    );
    let stream = execute_serialized_plan(
        &plan,
        &ExecuteParams {
            scan_start: 1,
            scan_end_exclusive: 4,
            morsel_size: 4,
            parallel_workers: 1,
        },
        &mut handle,
    )
    .unwrap();

    let ids: Vec<u64> = stream.rows.iter().map(|row| row.node_id).collect();
    assert_eq!(ids, vec![1, 3]);
}

#[test]
fn execute_serialized_plan_executes_expand_var_len_out() {
    let base = temp_dir("runtime_plexus_expand_varlen");
    let mut handle = storage_api::open_store(storage_api::StorageConfig {
        buffer_pool_pages: 8,
        wal_dir: base.join("wal"),
        wal_segment_max_bytes: 1 << 20,
        manifest_path: base.join("ir.manifest"),
        sstable_dir: base.join("sst"),
    })
    .unwrap();

    storage_api::put_full_node(&mut handle, 1, 1, &[2]).unwrap();
    storage_api::put_full_node(&mut handle, 2, 1, &[3]).unwrap();
    storage_api::put_full_node(&mut handle, 3, 1, &[4]).unwrap();
    storage_api::put_full_node(&mut handle, 4, 1, &[]).unwrap();
    storage_api::flush(&mut handle).unwrap();

    let plan = serialized_plan_bytes(
        (1, 0, 0),
        vec![
            scan_nodes(),
            Op::ExpandVarLen {
                input: 0,
                src_col: 0,
                types: Vec::new(),
                dir: ExpandDir::Out,
                min_hops: 2,
                max_hops: 3,
                schema: Vec::new(),
                src_var: "n".to_string(),
                path_var: "p".to_string(),
                dst_var: "m".to_string(),
                graph_ref: None,
            },
            Op::Return { input: 1 },
        ],
        2,
    );
    let stream = execute_serialized_plan(
        &plan,
        &ExecuteParams {
            scan_start: 1,
            scan_end_exclusive: 2,
            morsel_size: 4,
            parallel_workers: 1,
        },
        &mut handle,
    )
    .unwrap();

    let ids: Vec<u64> = stream.rows.iter().map(|row| row.node_id).collect();
    assert_eq!(ids, vec![3, 4]);
}