iridium-db 0.2.0

A high-performance vector-graph hybrid storage and indexing engine
use std::borrow::Cow;

use plexus_serde::{deserialize_plan, serialize_plan, ColDef, ColKind, LogicalType, Op, Plan};

use crate::features::storage::api as storage_api;

use super::super::{ExecuteParams, ExplainError, Result, RowStream};
use super::engine::execute_serialized_plan_bytes;

use plexus_ir::{
    flatbuffer_to_module, make_context, module_to_flatbuffer, optimize, PIPELINE_CORE,
    PIPELINE_FULL, PIPELINE_GRAPH,
};

pub(super) fn maybe_optimize_serialized_plan<'a>(bytes: &'a [u8]) -> Result<Cow<'a, [u8]>> {
    maybe_optimize_serialized_plan_with_pipeline(bytes, configured_opt_pipeline()?)
}

pub(super) fn maybe_optimize_serialized_plan_with_pipeline<'a>(
    bytes: &'a [u8],
    pipeline: Option<&'static str>,
) -> Result<Cow<'a, [u8]>> {
    let Some(pipeline) = pipeline else {
        return Ok(Cow::Borrowed(bytes));
    };
    let normalized_bytes = bridge_safe_plan_bytes(bytes)?;
    let ctx = make_context();
    let mut module = flatbuffer_to_module(&normalized_bytes, &ctx)
        .map_err(|error| ExplainError::SerializedPlanMalformed(error.to_string()))?;
    optimize(&mut module, &ctx, pipeline)
        .map_err(|error| ExplainError::InvalidPlan(format!("plexus optimize failed: {error}")))?;
    let optimized = module_to_flatbuffer(&module)
        .map_err(|error| ExplainError::SerializedPlanMalformed(error.to_string()))?;
    Ok(Cow::Owned(optimized))
}

#[allow(dead_code)]
pub(crate) fn execute_serialized_plan_with_pipeline_override(
    bytes: &[u8],
    pipeline: Option<&'static str>,
    params: &ExecuteParams,
    handle: &mut storage_api::StorageHandle,
) -> Result<RowStream> {
    let bytes = maybe_optimize_serialized_plan_with_pipeline(bytes, pipeline)?;
    execute_serialized_plan_bytes(bytes.as_ref(), params, handle)
}

pub(super) fn parse_opt_pipeline_name(raw: &str) -> Result<Option<&'static str>> {
    match raw {
        "" | "off" | "0" | "false" => Ok(None),
        "core" => Ok(Some(PIPELINE_CORE)),
        "graph" => Ok(Some(PIPELINE_GRAPH)),
        "full" => Ok(Some(PIPELINE_FULL)),
        other => Err(ExplainError::InvalidPlan(format!(
            "unsupported IR_PLEXUS_OPT_PIPELINE '{other}'; expected off|core|graph|full"
        ))),
    }
}

fn configured_opt_pipeline() -> Result<Option<&'static str>> {
    let raw = match std::env::var("IR_PLEXUS_OPT_PIPELINE") {
        Ok(value) => value.trim().to_ascii_lowercase(),
        Err(_) => return Ok(None),
    };
    parse_opt_pipeline_name(&raw)
}

fn bridge_safe_plan_bytes(bytes: &[u8]) -> Result<Vec<u8>> {
    let mut plan = deserialize_plan(bytes)
        .map_err(|error| ExplainError::SerializedPlanMalformed(error.to_string()))?;
    normalize_plan_for_mlir_bridge(&mut plan);
    serialize_plan(&plan).map_err(|error| ExplainError::SerializedPlanMalformed(error.to_string()))
}

fn normalize_plan_for_mlir_bridge(plan: &mut Plan) {
    for op in &mut plan.ops {
        normalize_op_schema_for_mlir_bridge(op);
    }
}

fn normalize_op_schema_for_mlir_bridge(op: &mut Op) {
    let schema = match op {
        Op::ScanNodes { schema, .. }
        | Op::ScanRels { schema, .. }
        | Op::Expand { schema, .. }
        | Op::OptionalExpand { schema, .. }
        | Op::SemiExpand { schema, .. }
        | Op::ExpandVarLen { schema, .. }
        | Op::Project { schema, .. }
        | Op::Aggregate { schema, .. }
        | Op::Unwind { schema, .. }
        | Op::PathConstruct { schema, .. }
        | Op::Union { schema, .. }
        | Op::CreateNode { schema, .. }
        | Op::CreateRel { schema, .. }
        | Op::Merge { schema, .. }
        | Op::Delete { schema, .. }
        | Op::SetProperty { schema, .. }
        | Op::RemoveProperty { schema, .. }
        | Op::VectorScan { schema, .. }
        | Op::Rerank { schema, .. } => Some(schema),
        Op::Filter { .. }
        | Op::BlockMarker { .. }
        | Op::Sort { .. }
        | Op::Limit { .. }
        | Op::Return { .. }
        | Op::ConstRow => None,
    };
    if let Some(schema) = schema {
        for col in schema {
            normalize_coldef_for_mlir_bridge(col);
        }
    }
}

fn normalize_coldef_for_mlir_bridge(col: &mut ColDef) {
    if matches!(
        (&col.kind, &col.logical_type),
        (ColKind::Node, LogicalType::NodeRef)
            | (ColKind::Rel, LogicalType::RelRef)
            | (ColKind::Path, LogicalType::Path)
    ) {
        col.logical_type = LogicalType::Unknown;
    }
}