iridium-db 0.2.0

A high-performance vector-graph hybrid storage and indexing engine
use super::*;

#[test]
fn execute_serialized_plan_rejects_invalid_magic() {
    let base = temp_dir("runtime_plexus_invalid_magic");
    let mut handle = storage_api::open_store(storage_api::StorageConfig {
        buffer_pool_pages: 8,
        wal_dir: base.join("wal"),
        wal_segment_max_bytes: 1 << 20,
        manifest_path: base.join("ir.manifest"),
        sstable_dir: base.join("sst"),
    })
    .unwrap();

    let err = execute_serialized_plan(b"not-a-flatbuffer", &ExecuteParams::default(), &mut handle)
        .unwrap_err();

    match err {
        ExplainError::SerializedPlanMalformed(message) => {
            assert!(message.contains("invalid flatbuffer"));
        }
        other => panic!("expected malformed plan error, got {:?}", other),
    }
}

#[test]
fn execute_serialized_plan_rejects_major_version_mismatch() {
    let base = temp_dir("runtime_plexus_version_gate");
    let mut handle = storage_api::open_store(storage_api::StorageConfig {
        buffer_pool_pages: 8,
        wal_dir: base.join("wal"),
        wal_segment_max_bytes: 1 << 20,
        manifest_path: base.join("ir.manifest"),
        sstable_dir: base.join("sst"),
    })
    .unwrap();

    let plan = serialized_plan_bytes(
        (2, 0, 0),
        vec![
            scan_nodes(),
            Op::Limit {
                input: 0,
                count: 2,
                skip: 0,
            },
            Op::Return { input: 1 },
        ],
        2,
    );
    let err = execute_serialized_plan(
        &plan,
        &ExecuteParams {
            scan_start: 0,
            scan_end_exclusive: 16,
            morsel_size: 4,
            parallel_workers: 1,
        },
        &mut handle,
    )
    .unwrap_err();

    match err {
        ExplainError::UnsupportedSerializedPlanVersion {
            found_version,
            supported_major,
            max_supported_minor,
        } => {
            assert_eq!(found_version, "2.0.0");
            assert_eq!(supported_major, 0_u32);
            assert_eq!(max_supported_minor, 3_u32);
        }
        other => panic!("expected unsupported version error, got {:?}", other),
    }
}

#[test]
fn execute_serialized_plan_rejects_invalid_op_reference() {
    let base = temp_dir("runtime_plexus_invalid_ref");
    let mut handle = storage_api::open_store(storage_api::StorageConfig {
        buffer_pool_pages: 8,
        wal_dir: base.join("wal"),
        wal_segment_max_bytes: 1 << 20,
        manifest_path: base.join("ir.manifest"),
        sstable_dir: base.join("sst"),
    })
    .unwrap();

    let plan = serialized_plan_bytes((1, 0, 0), vec![scan_nodes(), Op::Return { input: 3 }], 1);
    let err = execute_serialized_plan(
        &plan,
        &ExecuteParams {
            scan_start: 0,
            scan_end_exclusive: 16,
            morsel_size: 4,
            parallel_workers: 1,
        },
        &mut handle,
    )
    .unwrap_err();

    match err {
        ExplainError::SerializedPlanMalformed(message) => {
            assert!(message.contains("invalid edge reference"));
        }
        other => panic!("expected invalid edge reference error, got {:?}", other),
    }
}

#[test]
fn execute_serialized_plan_executes_minimal_readonly_subset() {
    let base = temp_dir("runtime_plexus_readonly_subset");
    let mut handle = storage_api::open_store(storage_api::StorageConfig {
        buffer_pool_pages: 8,
        wal_dir: base.join("wal"),
        wal_segment_max_bytes: 1 << 20,
        manifest_path: base.join("ir.manifest"),
        sstable_dir: base.join("sst"),
    })
    .unwrap();

    for node_id in 1..=8 {
        storage_api::put_full_node(&mut handle, node_id, 1, &[node_id + 1]).unwrap();
    }
    storage_api::flush(&mut handle).unwrap();

    let plan = serialized_plan_bytes(
        (1, 0, 0),
        vec![
            scan_nodes(),
            Op::Project {
                input: 0,
                exprs: Vec::new(),
                schema: Vec::new(),
            },
            Op::Limit {
                input: 1,
                count: 3,
                skip: 0,
            },
            Op::Return { input: 2 },
        ],
        3,
    );
    let stream = execute_serialized_plan(
        &plan,
        &ExecuteParams {
            scan_start: 1,
            scan_end_exclusive: 9,
            morsel_size: 4,
            parallel_workers: 1,
        },
        &mut handle,
    )
    .unwrap();

    assert_eq!(stream.rows.len(), 3);
    let ids: Vec<u64> = stream.rows.iter().map(|row| row.node_id).collect();
    assert_eq!(ids, vec![1, 2, 3]);
}

#[cfg(feature = "plexus-mlir-opt")]
#[test]
fn execute_serialized_plan_core_optimizer_pipeline_executes_minimal_readonly_subset() {
    let base = temp_dir("runtime_plexus_opt_core");
    let mut handle = storage_api::open_store(storage_api::StorageConfig {
        buffer_pool_pages: 8,
        wal_dir: base.join("wal"),
        wal_segment_max_bytes: 1 << 20,
        manifest_path: base.join("ir.manifest"),
        sstable_dir: base.join("sst"),
    })
    .unwrap();

    for node_id in 1..=8 {
        storage_api::put_full_node(&mut handle, node_id, 1, &[node_id + 1]).unwrap();
    }
    storage_api::flush(&mut handle).unwrap();

    let plan = serialized_plan_bytes(
        (1, 0, 0),
        vec![
            scan_nodes(),
            Op::Limit {
                input: 0,
                count: 3,
                skip: 0,
            },
            Op::Return { input: 1 },
        ],
        2,
    );

    let stream = execute_serialized_plan_with_pipeline_override(
        &plan,
        Some(PIPELINE_CORE),
        &ExecuteParams {
            scan_start: 1,
            scan_end_exclusive: 9,
            morsel_size: 4,
            parallel_workers: 1,
        },
        &mut handle,
    )
    .unwrap();

    assert_eq!(stream.rows.len(), 3);
    let ids: Vec<u64> = stream.rows.iter().map(|row| row.node_id).collect();
    assert_eq!(ids, vec![1, 2, 3]);
}

#[cfg(feature = "plexus-mlir-opt")]
#[test]
fn execute_serialized_plan_rejects_unknown_optimizer_pipeline_name() {
    let base = temp_dir("runtime_plexus_opt_bad_pipeline");
    let mut handle = storage_api::open_store(storage_api::StorageConfig {
        buffer_pool_pages: 8,
        wal_dir: base.join("wal"),
        wal_segment_max_bytes: 1 << 20,
        manifest_path: base.join("ir.manifest"),
        sstable_dir: base.join("sst"),
    })
    .unwrap();

    let plan = serialized_plan_bytes((1, 0, 0), vec![scan_nodes(), Op::Return { input: 0 }], 1);
    let err = execute_serialized_plan_with_pipeline_name_override(
        &plan,
        "banana",
        &ExecuteParams::default(),
        &mut handle,
    )
    .unwrap_err();

    match err {
        ExplainError::InvalidPlan(message) => {
            assert!(message.contains("IR_PLEXUS_OPT_PIPELINE"));
        }
        other => panic!("expected invalid plan error, got {:?}", other),
    }
}

#[test]
fn plexus_iridium_engine_executes_deserialized_plan_directly() {
    let base = temp_dir("runtime_plexus_engine_direct");
    let mut handle = storage_api::open_store(storage_api::StorageConfig {
        buffer_pool_pages: 8,
        wal_dir: base.join("wal"),
        wal_segment_max_bytes: 1 << 20,
        manifest_path: base.join("ir.manifest"),
        sstable_dir: base.join("sst"),
    })
    .unwrap();

    for node_id in 1..=8 {
        storage_api::put_full_node(&mut handle, node_id, 1, &[node_id + 1]).unwrap();
    }
    storage_api::flush(&mut handle).unwrap();

    let bytes = serialized_plan_bytes(
        (1, 0, 0),
        vec![
            scan_nodes(),
            Op::Limit {
                input: 0,
                count: 3,
                skip: 0,
            },
            Op::Return { input: 1 },
        ],
        2,
    );
    let plan = deserialize_plan(&bytes).unwrap();
    let mut engine = PlexusIridiumEngine::new(
        &ExecuteParams {
            scan_start: 1,
            scan_end_exclusive: 9,
            morsel_size: 4,
            parallel_workers: 1,
        },
        &mut handle,
    );

    let out = PlanEngine::execute_plan(&mut engine, &plan).unwrap();
    let rows: Vec<u64> = out
        .rows
        .into_iter()
        .map(|row| match row.first() {
            Some(::plexus_engine::Value::Int(value)) => *value as u64,
            other => panic!("unexpected row shape: {:?}", other),
        })
        .collect();

    assert_eq!(rows, vec![1, 2, 3]);
}

#[test]
fn execute_serialized_plan_applies_scalar_filter_predicate() {
    let base = temp_dir("runtime_plexus_scalar_filter");
    let mut handle = storage_api::open_store(storage_api::StorageConfig {
        buffer_pool_pages: 8,
        wal_dir: base.join("wal"),
        wal_segment_max_bytes: 1 << 20,
        manifest_path: base.join("ir.manifest"),
        sstable_dir: base.join("sst"),
    })
    .unwrap();

    storage_api::put_full_node(&mut handle, 1, 1, &[2]).unwrap();
    storage_api::put_full_node(&mut handle, 2, 1, &[3, 4]).unwrap();
    storage_api::put_full_node(&mut handle, 3, 1, &[5, 6, 7]).unwrap();
    storage_api::flush(&mut handle).unwrap();

    let plan = serialized_plan_bytes(
        (1, 0, 0),
        vec![
            scan_nodes(),
            Op::Filter {
                input: 0,
                predicate: Expr::Cmp {
                    op: CmpOp::Ge,
                    lhs: Box::new(Expr::PropAccess {
                        col: 0,
                        prop: "adjacency_degree".to_string(),
                    }),
                    rhs: Box::new(Expr::IntLiteral(2)),
                },
            },
            Op::Return { input: 1 },
        ],
        2,
    );
    let stream = execute_serialized_plan(
        &plan,
        &ExecuteParams {
            scan_start: 1,
            scan_end_exclusive: 4,
            morsel_size: 4,
            parallel_workers: 1,
        },
        &mut handle,
    )
    .unwrap();

    let ids: Vec<u64> = stream.rows.iter().map(|row| row.node_id).collect();
    assert_eq!(ids, vec![2, 3]);
}

#[test]
fn execute_serialized_plan_rejects_unsupported_op_capability() {
    let base = temp_dir("runtime_plexus_unsupported_capability");
    let mut handle = storage_api::open_store(storage_api::StorageConfig {
        buffer_pool_pages: 8,
        wal_dir: base.join("wal"),
        wal_segment_max_bytes: 1 << 20,
        manifest_path: base.join("ir.manifest"),
        sstable_dir: base.join("sst"),
    })
    .unwrap();

    let plan = serialized_plan_bytes(
        (1, 0, 0),
        vec![
            scan_nodes(),
            Op::CreateNode {
                input: 0,
                labels: Vec::new(),
                props: Expr::NullLiteral,
                schema: Vec::new(),
                out_var: "n".to_string(),
            },
            Op::Return { input: 1 },
        ],
        2,
    );
    let err = execute_serialized_plan(&plan, &ExecuteParams::default(), &mut handle).unwrap_err();

    match err {
        ExplainError::UnsupportedSerializedOperator(message) => {
            assert!(message.contains("CreateNode"));
        }
        other => panic!("expected unsupported capability error, got {:?}", other),
    }
}