use std::borrow::Cow;
use plexus_serde::{deserialize_plan, serialize_plan, ColDef, ColKind, LogicalType, Op, Plan};
use crate::features::storage::api as storage_api;
use super::super::{ExecuteParams, ExplainError, Result, RowStream};
use super::engine::execute_serialized_plan_bytes;
use plexus_ir::{
flatbuffer_to_module, make_context, module_to_flatbuffer, optimize, PIPELINE_CORE,
PIPELINE_FULL, PIPELINE_GRAPH,
};
pub(super) fn maybe_optimize_serialized_plan<'a>(bytes: &'a [u8]) -> Result<Cow<'a, [u8]>> {
maybe_optimize_serialized_plan_with_pipeline(bytes, configured_opt_pipeline()?)
}
pub(super) fn maybe_optimize_serialized_plan_with_pipeline<'a>(
bytes: &'a [u8],
pipeline: Option<&'static str>,
) -> Result<Cow<'a, [u8]>> {
let Some(pipeline) = pipeline else {
return Ok(Cow::Borrowed(bytes));
};
let normalized_bytes = bridge_safe_plan_bytes(bytes)?;
let ctx = make_context();
let mut module = flatbuffer_to_module(&normalized_bytes, &ctx)
.map_err(|error| ExplainError::SerializedPlanMalformed(error.to_string()))?;
optimize(&mut module, &ctx, pipeline)
.map_err(|error| ExplainError::InvalidPlan(format!("plexus optimize failed: {error}")))?;
let optimized = module_to_flatbuffer(&module)
.map_err(|error| ExplainError::SerializedPlanMalformed(error.to_string()))?;
Ok(Cow::Owned(optimized))
}
#[allow(dead_code)]
pub(crate) fn execute_serialized_plan_with_pipeline_override(
bytes: &[u8],
pipeline: Option<&'static str>,
params: &ExecuteParams,
handle: &mut storage_api::StorageHandle,
) -> Result<RowStream> {
let bytes = maybe_optimize_serialized_plan_with_pipeline(bytes, pipeline)?;
execute_serialized_plan_bytes(bytes.as_ref(), params, handle)
}
pub(super) fn parse_opt_pipeline_name(raw: &str) -> Result<Option<&'static str>> {
match raw {
"" | "off" | "0" | "false" => Ok(None),
"core" => Ok(Some(PIPELINE_CORE)),
"graph" => Ok(Some(PIPELINE_GRAPH)),
"full" => Ok(Some(PIPELINE_FULL)),
other => Err(ExplainError::InvalidPlan(format!(
"unsupported IR_PLEXUS_OPT_PIPELINE '{other}'; expected off|core|graph|full"
))),
}
}
fn configured_opt_pipeline() -> Result<Option<&'static str>> {
let raw = match std::env::var("IR_PLEXUS_OPT_PIPELINE") {
Ok(value) => value.trim().to_ascii_lowercase(),
Err(_) => return Ok(None),
};
parse_opt_pipeline_name(&raw)
}
fn bridge_safe_plan_bytes(bytes: &[u8]) -> Result<Vec<u8>> {
let mut plan = deserialize_plan(bytes)
.map_err(|error| ExplainError::SerializedPlanMalformed(error.to_string()))?;
normalize_plan_for_mlir_bridge(&mut plan);
serialize_plan(&plan).map_err(|error| ExplainError::SerializedPlanMalformed(error.to_string()))
}
fn normalize_plan_for_mlir_bridge(plan: &mut Plan) {
for op in &mut plan.ops {
normalize_op_schema_for_mlir_bridge(op);
}
}
fn normalize_op_schema_for_mlir_bridge(op: &mut Op) {
let schema = match op {
Op::ScanNodes { schema, .. }
| Op::ScanRels { schema, .. }
| Op::Expand { schema, .. }
| Op::OptionalExpand { schema, .. }
| Op::SemiExpand { schema, .. }
| Op::ExpandVarLen { schema, .. }
| Op::Project { schema, .. }
| Op::Aggregate { schema, .. }
| Op::Unwind { schema, .. }
| Op::PathConstruct { schema, .. }
| Op::Union { schema, .. }
| Op::CreateNode { schema, .. }
| Op::CreateRel { schema, .. }
| Op::Merge { schema, .. }
| Op::Delete { schema, .. }
| Op::SetProperty { schema, .. }
| Op::RemoveProperty { schema, .. }
| Op::VectorScan { schema, .. }
| Op::Rerank { schema, .. } => Some(schema),
Op::Filter { .. }
| Op::BlockMarker { .. }
| Op::Sort { .. }
| Op::Limit { .. }
| Op::Return { .. }
| Op::ConstRow => None,
};
if let Some(schema) = schema {
for col in schema {
normalize_coldef_for_mlir_bridge(col);
}
}
}
fn normalize_coldef_for_mlir_bridge(col: &mut ColDef) {
if matches!(
(&col.kind, &col.logical_type),
(ColKind::Node, LogicalType::NodeRef)
| (ColKind::Rel, LogicalType::RelRef)
| (ColKind::Path, LogicalType::Path)
) {
col.logical_type = LogicalType::Unknown;
}
}