iridium-db 0.2.0

A high-performance vector-graph hybrid storage and indexing engine
use std::collections::BTreeSet;

use plexus_engine::{
    check_version_compat, validate_plan_against_capabilities, CapabilityError, EngineCapabilities,
    ExprKind, OpKind, PlanSemver, VersionRange,
};
use plexus_serde::{Op, Plan, PLAN_FORMAT_MAJOR, PLAN_FORMAT_MINOR};

use super::super::{ExplainError, Result};

const SUPPORTED_MAJOR: u32 = PLAN_FORMAT_MAJOR;
const MAX_SUPPORTED_MINOR: u32 = PLAN_FORMAT_MINOR;

pub(super) fn validate_supported_version(plan: &Plan) -> Result<()> {
    let range = VersionRange::new(
        PlanSemver::new(SUPPORTED_MAJOR, 0, 0),
        PlanSemver::new(SUPPORTED_MAJOR, MAX_SUPPORTED_MINOR, u32::MAX),
    );
    check_version_compat(&plan.version, range).map_err(map_capability_error)
}

pub(super) fn validate_engine_capabilities(plan: &Plan) -> Result<()> {
    validate_plan_against_capabilities(plan, &iridium_plexus_capabilities())
        .map_err(map_capability_error)
}

pub(super) fn validate_plan_references(plan: &Plan) -> Result<()> {
    if plan.ops.is_empty() {
        return Err(ExplainError::SerializedPlanMalformed(
            "plexus plan has no ops".to_string(),
        ));
    }
    let root = plan.root_op as usize;
    if root >= plan.ops.len() {
        return Err(ExplainError::SerializedPlanMalformed(format!(
            "invalid root op index {} for op count {}",
            plan.root_op,
            plan.ops.len()
        )));
    }
    for (index, op) in plan.ops.iter().enumerate() {
        for input_ref in op_input_refs(op) {
            if input_ref as usize >= plan.ops.len() {
                return Err(ExplainError::SerializedPlanMalformed(format!(
                    "invalid edge reference {}->{} for op count {}",
                    input_ref,
                    index,
                    plan.ops.len()
                )));
            }
        }
    }
    Ok(())
}

pub(crate) fn iridium_plexus_capabilities() -> EngineCapabilities {
    EngineCapabilities {
        version_range: VersionRange::new(
            PlanSemver::new(SUPPORTED_MAJOR, 0, 0),
            PlanSemver::new(SUPPORTED_MAJOR, MAX_SUPPORTED_MINOR, u32::MAX),
        ),
        supported_ops: BTreeSet::from([
            OpKind::ScanNodes,
            OpKind::Expand,
            OpKind::OptionalExpand,
            OpKind::SemiExpand,
            OpKind::ExpandVarLen,
            OpKind::Filter,
            OpKind::Project,
            OpKind::Sort,
            OpKind::Limit,
            OpKind::Aggregate,
            OpKind::Unwind,
            OpKind::PathConstruct,
            OpKind::Union,
            OpKind::Return,
        ]),
        supported_exprs: BTreeSet::from([
            ExprKind::ColRef,
            ExprKind::PropAccess,
            ExprKind::IntLiteral,
            ExprKind::FloatLiteral,
            ExprKind::BoolLiteral,
            ExprKind::Cmp,
            ExprKind::ListLiteral,
            ExprKind::Agg,
        ]),
        supports_graph_ref: false,
        supports_multi_graph: false,
        supports_graph_params: false,
    }
}

fn map_capability_error(error: CapabilityError) -> ExplainError {
    match error {
        CapabilityError::UnsupportedPlanVersion {
            plan_major,
            plan_minor,
            plan_patch,
            ..
        } => ExplainError::UnsupportedSerializedPlanVersion {
            found_version: format!("{}.{}.{}", plan_major, plan_minor, plan_patch),
            supported_major: SUPPORTED_MAJOR,
            max_supported_minor: MAX_SUPPORTED_MINOR,
        },
        CapabilityError::MissingFeatureSupport {
            missing_ops,
            missing_exprs,
        } => {
            let missing_ops = missing_ops
                .into_iter()
                .map(|kind| format!("{:?}", kind))
                .collect::<Vec<_>>();
            let missing_exprs = missing_exprs
                .into_iter()
                .map(|kind| format!("{:?}", kind))
                .collect::<Vec<_>>();
            ExplainError::UnsupportedSerializedOperator(format!(
                "plan uses unsupported capabilities (ops: [{}], exprs: [{}])",
                missing_ops.join(", "),
                missing_exprs.join(", ")
            ))
        }
        other => ExplainError::SerializedPlanMalformed(other.to_string()),
    }
}

pub(super) fn op_input_refs(op: &Op) -> Vec<u32> {
    match op {
        Op::ScanNodes { .. } | Op::ScanRels { .. } | Op::ConstRow => Vec::new(),
        Op::Union { lhs, rhs, .. } => vec![*lhs, *rhs],
        Op::Expand { input, .. }
        | Op::OptionalExpand { input, .. }
        | Op::SemiExpand { input, .. }
        | Op::ExpandVarLen { input, .. }
        | Op::Filter { input, .. }
        | Op::BlockMarker { input, .. }
        | Op::Project { input, .. }
        | Op::Aggregate { input, .. }
        | Op::Sort { input, .. }
        | Op::Limit { input, .. }
        | Op::Unwind { input, .. }
        | Op::PathConstruct { input, .. }
        | Op::CreateNode { input, .. }
        | Op::CreateRel { input, .. }
        | Op::Merge { input, .. }
        | Op::Delete { input, .. }
        | Op::SetProperty { input, .. }
        | Op::RemoveProperty { input, .. }
        | Op::VectorScan { input, .. }
        | Op::Rerank { input, .. }
        | Op::Return { input } => vec![*input],
    }
}

pub(super) fn op_name(op: &Op) -> &'static str {
    match op {
        Op::ScanNodes { .. } => "ScanNodes",
        Op::ScanRels { .. } => "ScanRels",
        Op::Expand { .. } => "Expand",
        Op::OptionalExpand { .. } => "OptionalExpand",
        Op::SemiExpand { .. } => "SemiExpand",
        Op::ExpandVarLen { .. } => "ExpandVarLen",
        Op::Filter { .. } => "Filter",
        Op::BlockMarker { .. } => "BlockMarker",
        Op::Project { .. } => "Project",
        Op::Aggregate { .. } => "Aggregate",
        Op::Sort { .. } => "Sort",
        Op::Limit { .. } => "Limit",
        Op::Unwind { .. } => "Unwind",
        Op::PathConstruct { .. } => "PathConstruct",
        Op::Union { .. } => "Union",
        Op::CreateNode { .. } => "CreateNode",
        Op::CreateRel { .. } => "CreateRel",
        Op::Merge { .. } => "Merge",
        Op::Delete { .. } => "Delete",
        Op::SetProperty { .. } => "SetProperty",
        Op::RemoveProperty { .. } => "RemoveProperty",
        Op::VectorScan { .. } => "VectorScan",
        Op::Rerank { .. } => "Rerank",
        Op::Return { .. } => "Return",
        Op::ConstRow => "ConstRow",
    }
}