use std::fmt;
use crate::features::storage::api as storage_api;
use plexus_engine::{PlanEngine, QueryResult, Value};
use plexus_serde::{deserialize_plan, Plan};
#[cfg(feature = "rhodium-backend")]
use rhodium::core::storage::blob::{
build_compiled_plan_cache_descriptor, CompiledPlanCacheDescriptor, CompiledPlanCacheRequest,
};
#[cfg(feature = "plexus-mlir-opt")]
use super::optimize::maybe_optimize_serialized_plan;
use super::translate::execute_deserialized_plan;
#[cfg(feature = "rhodium-backend")]
use super::validate::iridium_plexus_capabilities;
use super::validate::{
validate_engine_capabilities, validate_plan_references, validate_supported_version,
};
use super::{
super::execution,
super::{ExecuteParams, ExecutionMode, ExplainError, ExplainPlan, Result, Row, RowStream},
};
#[cfg(feature = "rhodium-backend")]
const EXECUTION_ENGINE_LABEL: &str = "iridium-runtime";
#[cfg(feature = "rhodium-backend")]
const DECLARED_CONSUMER_LABEL: &str = "iridium-runtime";
#[cfg(all(feature = "rhodium-backend", feature = "plexus-mlir-opt"))]
const DEFAULT_PIPELINE_LABEL: &str = "mlir-opt";
#[cfg(all(feature = "rhodium-backend", not(feature = "plexus-mlir-opt")))]
const DEFAULT_PIPELINE_LABEL: &str = "direct";
pub fn execute_serialized_plan(
bytes: &[u8],
params: &ExecuteParams,
handle: &mut storage_api::StorageHandle,
) -> Result<RowStream> {
#[cfg(feature = "plexus-mlir-opt")]
let bytes = maybe_optimize_serialized_plan(bytes)?;
#[cfg(not(feature = "plexus-mlir-opt"))]
let bytes = std::borrow::Cow::Borrowed(bytes);
#[cfg(feature = "rhodium-backend")]
let bytes =
maybe_hydrate_compiled_plan_from_rhodium(bytes.as_ref(), DEFAULT_PIPELINE_LABEL, handle)?;
execute_serialized_plan_bytes(bytes.as_ref(), params, handle)
}
pub(super) fn execute_serialized_plan_bytes(
bytes: &[u8],
params: &ExecuteParams,
handle: &mut storage_api::StorageHandle,
) -> Result<RowStream> {
let plan = deserialize_plan(bytes)
.map_err(|error| ExplainError::SerializedPlanMalformed(error.to_string()))?;
validate_supported_version(&plan)?;
validate_engine_capabilities(&plan)?;
validate_plan_references(&plan)?;
let mut engine = PlexusIridiumEngine::new(params, handle);
engine.execute_plan_stream(&plan)
}
#[cfg(all(test, feature = "plexus-mlir-opt"))]
pub(crate) fn execute_serialized_plan_with_pipeline_override(
bytes: &[u8],
pipeline: Option<&'static str>,
params: &ExecuteParams,
handle: &mut storage_api::StorageHandle,
) -> Result<RowStream> {
let bytes = super::optimize::maybe_optimize_serialized_plan_with_pipeline(bytes, pipeline)?;
#[cfg(feature = "rhodium-backend")]
let bytes = maybe_hydrate_compiled_plan_from_rhodium(
bytes.as_ref(),
pipeline.unwrap_or("mlir-opt"),
handle,
)?;
execute_serialized_plan_bytes(bytes.as_ref(), params, handle)
}
#[cfg(all(test, feature = "plexus-mlir-opt"))]
pub(crate) fn execute_serialized_plan_with_pipeline_name_override(
bytes: &[u8],
pipeline_name: &str,
params: &ExecuteParams,
handle: &mut storage_api::StorageHandle,
) -> Result<RowStream> {
let pipeline = super::optimize::parse_opt_pipeline_name(pipeline_name)?;
execute_serialized_plan_with_pipeline_override(bytes, pipeline, params, handle)
}
#[cfg(feature = "rhodium-backend")]
pub fn describe_compiled_plan_cache(
bytes: &[u8],
pipeline: Option<&str>,
) -> Result<CompiledPlanCacheDescriptor> {
compiled_plan_cache_descriptor(bytes, pipeline.unwrap_or(DEFAULT_PIPELINE_LABEL))
}
#[cfg(feature = "rhodium-backend")]
fn maybe_hydrate_compiled_plan_from_rhodium(
bytes: &[u8],
pipeline: &str,
handle: &mut storage_api::StorageHandle,
) -> Result<Vec<u8>> {
if handle.blob_backend != storage_api::BlobBackend::Rhodium {
return Ok(bytes.to_vec());
}
let descriptor = compiled_plan_cache_descriptor(bytes, pipeline)?;
storage_api::put_compiled_plan_with_options(
handle,
&descriptor,
bytes,
storage_api::BlobPutOptions::default(),
)
.map_err(map_storage_error)?;
let hydrated = storage_api::get_compiled_plan_with_options(
handle,
&descriptor,
storage_api::BlobReadOptions {
tier_policy: storage_api::BlobReadTierPolicy::LocalFirst,
rehydrate_local: true,
},
)
.map_err(map_storage_error)?
.ok_or_else(|| {
ExplainError::SerializedPlanMalformed(
"compiled-plan cache unexpectedly returned no artifact".to_string(),
)
})?;
Ok(hydrated.data)
}
#[cfg(feature = "rhodium-backend")]
fn compiled_plan_cache_descriptor(
bytes: &[u8],
pipeline: &str,
) -> Result<CompiledPlanCacheDescriptor> {
build_compiled_plan_cache_descriptor(
bytes,
&iridium_plexus_capabilities(),
CompiledPlanCacheRequest {
execution_engine: EXECUTION_ENGINE_LABEL,
pipeline,
declared_consumer: DECLARED_CONSUMER_LABEL,
},
)
.map_err(|error| ExplainError::SerializedPlanMalformed(error.to_string()))
}
#[cfg(feature = "rhodium-backend")]
fn map_storage_error(error: storage_api::StorageError) -> ExplainError {
ExplainError::SerializedPlanMalformed(format!("compiled-plan cache path failed: {:?}", error))
}
pub(crate) struct PlexusIridiumEngine<'a> {
params: &'a ExecuteParams,
handle: &'a mut storage_api::StorageHandle,
}
impl<'a> PlexusIridiumEngine<'a> {
pub(crate) fn new(
params: &'a ExecuteParams,
handle: &'a mut storage_api::StorageHandle,
) -> Self {
Self { params, handle }
}
fn execute_plan_stream(&mut self, plan: &Plan) -> Result<RowStream> {
execute_deserialized_plan(plan, self.params, self.handle)
}
}
#[derive(Debug, Clone)]
pub(crate) struct PlexusEngineError(pub(crate) ExplainError);
impl fmt::Display for PlexusEngineError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self.0 {
ExplainError::InvalidPlan(message)
| ExplainError::SerializedPlanMalformed(message)
| ExplainError::UnsupportedSerializedOperator(message) => f.write_str(message),
ExplainError::UnsupportedSerializedPlanVersion {
found_version,
supported_major,
max_supported_minor,
} => write!(
f,
"unsupported serialized plan version {}; supported major={}, max minor={}",
found_version, supported_major, max_supported_minor
),
}
}
}
impl std::error::Error for PlexusEngineError {}
impl From<PlexusEngineError> for ExplainError {
fn from(value: PlexusEngineError) -> Self {
value.0
}
}
impl PlanEngine for PlexusIridiumEngine<'_> {
type Error = PlexusEngineError;
fn execute_plan(&mut self, plan: &Plan) -> std::result::Result<QueryResult, Self::Error> {
let stream = self.execute_plan_stream(plan).map_err(PlexusEngineError)?;
Ok(row_stream_to_query_result(stream))
}
}
fn row_stream_to_query_result(stream: RowStream) -> QueryResult {
QueryResult {
rows: stream
.rows
.into_iter()
.map(|row| vec![Value::Int(row.node_id as i64)])
.collect(),
}
}
#[allow(dead_code)]
fn query_result_to_row_stream(query_result: QueryResult) -> Result<RowStream> {
let mut rows = Vec::with_capacity(query_result.rows.len());
for value_row in query_result.rows {
let node_id = match value_row.first() {
Some(Value::Int(value)) if *value >= 0 => *value as u64,
other => {
return Err(ExplainError::SerializedPlanMalformed(format!(
"plexus engine produced unsupported row shape: {:?}",
other
)))
}
};
rows.push(Row {
node_id,
has_full: false,
delta_count: 0,
adjacency_degree: 0,
score: None,
aggregate_value: None,
});
}
let scanned_nodes = rows.len() as u64;
Ok(RowStream {
rows,
scanned_nodes,
latency_micros: 0,
morsels_processed: 0,
rerank_batches: 0,
parallel_workers: 1,
})
}
pub fn execute_in_mode(
mode: ExecutionMode,
native_plan: Option<&ExplainPlan>,
serialized_plan_bytes: Option<&[u8]>,
params: &ExecuteParams,
handle: &mut storage_api::StorageHandle,
) -> Result<RowStream> {
match mode {
ExecutionMode::Native => {
let Some(plan) = native_plan else {
return Err(ExplainError::InvalidPlan(
"native execution mode requires a native explain plan".to_string(),
));
};
execution::execute(plan, params, handle)
}
ExecutionMode::Plexus => {
let Some(bytes) = serialized_plan_bytes else {
return Err(ExplainError::SerializedPlanMalformed(
"plexus execution mode requires serialized plan bytes".to_string(),
));
};
execute_serialized_plan(bytes, params, handle)
}
}
}