iridium-db 0.2.0

A high-performance vector-graph hybrid storage and indexing engine
use std::fmt;

use crate::features::storage::api as storage_api;
use plexus_engine::{PlanEngine, QueryResult, Value};
use plexus_serde::{deserialize_plan, Plan};
#[cfg(feature = "rhodium-backend")]
use rhodium::core::storage::blob::{
    build_compiled_plan_cache_descriptor, CompiledPlanCacheDescriptor, CompiledPlanCacheRequest,
};

#[cfg(feature = "plexus-mlir-opt")]
use super::optimize::maybe_optimize_serialized_plan;
use super::translate::execute_deserialized_plan;
#[cfg(feature = "rhodium-backend")]
use super::validate::iridium_plexus_capabilities;
use super::validate::{
    validate_engine_capabilities, validate_plan_references, validate_supported_version,
};
use super::{
    super::execution,
    super::{ExecuteParams, ExecutionMode, ExplainError, ExplainPlan, Result, Row, RowStream},
};

#[cfg(feature = "rhodium-backend")]
const EXECUTION_ENGINE_LABEL: &str = "iridium-runtime";
#[cfg(feature = "rhodium-backend")]
const DECLARED_CONSUMER_LABEL: &str = "iridium-runtime";
#[cfg(all(feature = "rhodium-backend", feature = "plexus-mlir-opt"))]
const DEFAULT_PIPELINE_LABEL: &str = "mlir-opt";
#[cfg(all(feature = "rhodium-backend", not(feature = "plexus-mlir-opt")))]
const DEFAULT_PIPELINE_LABEL: &str = "direct";

pub fn execute_serialized_plan(
    bytes: &[u8],
    params: &ExecuteParams,
    handle: &mut storage_api::StorageHandle,
) -> Result<RowStream> {
    #[cfg(feature = "plexus-mlir-opt")]
    let bytes = maybe_optimize_serialized_plan(bytes)?;

    #[cfg(not(feature = "plexus-mlir-opt"))]
    let bytes = std::borrow::Cow::Borrowed(bytes);

    #[cfg(feature = "rhodium-backend")]
    let bytes =
        maybe_hydrate_compiled_plan_from_rhodium(bytes.as_ref(), DEFAULT_PIPELINE_LABEL, handle)?;

    execute_serialized_plan_bytes(bytes.as_ref(), params, handle)
}

pub(super) fn execute_serialized_plan_bytes(
    bytes: &[u8],
    params: &ExecuteParams,
    handle: &mut storage_api::StorageHandle,
) -> Result<RowStream> {
    let plan = deserialize_plan(bytes)
        .map_err(|error| ExplainError::SerializedPlanMalformed(error.to_string()))?;
    validate_supported_version(&plan)?;
    validate_engine_capabilities(&plan)?;
    validate_plan_references(&plan)?;
    let mut engine = PlexusIridiumEngine::new(params, handle);
    engine.execute_plan_stream(&plan)
}

#[cfg(all(test, feature = "plexus-mlir-opt"))]
pub(crate) fn execute_serialized_plan_with_pipeline_override(
    bytes: &[u8],
    pipeline: Option<&'static str>,
    params: &ExecuteParams,
    handle: &mut storage_api::StorageHandle,
) -> Result<RowStream> {
    let bytes = super::optimize::maybe_optimize_serialized_plan_with_pipeline(bytes, pipeline)?;
    #[cfg(feature = "rhodium-backend")]
    let bytes = maybe_hydrate_compiled_plan_from_rhodium(
        bytes.as_ref(),
        pipeline.unwrap_or("mlir-opt"),
        handle,
    )?;
    execute_serialized_plan_bytes(bytes.as_ref(), params, handle)
}

#[cfg(all(test, feature = "plexus-mlir-opt"))]
pub(crate) fn execute_serialized_plan_with_pipeline_name_override(
    bytes: &[u8],
    pipeline_name: &str,
    params: &ExecuteParams,
    handle: &mut storage_api::StorageHandle,
) -> Result<RowStream> {
    let pipeline = super::optimize::parse_opt_pipeline_name(pipeline_name)?;
    execute_serialized_plan_with_pipeline_override(bytes, pipeline, params, handle)
}

#[cfg(feature = "rhodium-backend")]
pub fn describe_compiled_plan_cache(
    bytes: &[u8],
    pipeline: Option<&str>,
) -> Result<CompiledPlanCacheDescriptor> {
    compiled_plan_cache_descriptor(bytes, pipeline.unwrap_or(DEFAULT_PIPELINE_LABEL))
}

#[cfg(feature = "rhodium-backend")]
fn maybe_hydrate_compiled_plan_from_rhodium(
    bytes: &[u8],
    pipeline: &str,
    handle: &mut storage_api::StorageHandle,
) -> Result<Vec<u8>> {
    if handle.blob_backend != storage_api::BlobBackend::Rhodium {
        return Ok(bytes.to_vec());
    }

    let descriptor = compiled_plan_cache_descriptor(bytes, pipeline)?;
    storage_api::put_compiled_plan_with_options(
        handle,
        &descriptor,
        bytes,
        storage_api::BlobPutOptions::default(),
    )
    .map_err(map_storage_error)?;
    let hydrated = storage_api::get_compiled_plan_with_options(
        handle,
        &descriptor,
        storage_api::BlobReadOptions {
            tier_policy: storage_api::BlobReadTierPolicy::LocalFirst,
            rehydrate_local: true,
        },
    )
    .map_err(map_storage_error)?
    .ok_or_else(|| {
        ExplainError::SerializedPlanMalformed(
            "compiled-plan cache unexpectedly returned no artifact".to_string(),
        )
    })?;
    Ok(hydrated.data)
}

#[cfg(feature = "rhodium-backend")]
fn compiled_plan_cache_descriptor(
    bytes: &[u8],
    pipeline: &str,
) -> Result<CompiledPlanCacheDescriptor> {
    build_compiled_plan_cache_descriptor(
        bytes,
        &iridium_plexus_capabilities(),
        CompiledPlanCacheRequest {
            execution_engine: EXECUTION_ENGINE_LABEL,
            pipeline,
            declared_consumer: DECLARED_CONSUMER_LABEL,
        },
    )
    .map_err(|error| ExplainError::SerializedPlanMalformed(error.to_string()))
}

#[cfg(feature = "rhodium-backend")]
fn map_storage_error(error: storage_api::StorageError) -> ExplainError {
    ExplainError::SerializedPlanMalformed(format!("compiled-plan cache path failed: {:?}", error))
}

pub(crate) struct PlexusIridiumEngine<'a> {
    params: &'a ExecuteParams,
    handle: &'a mut storage_api::StorageHandle,
}

impl<'a> PlexusIridiumEngine<'a> {
    pub(crate) fn new(
        params: &'a ExecuteParams,
        handle: &'a mut storage_api::StorageHandle,
    ) -> Self {
        Self { params, handle }
    }

    fn execute_plan_stream(&mut self, plan: &Plan) -> Result<RowStream> {
        execute_deserialized_plan(plan, self.params, self.handle)
    }
}

#[derive(Debug, Clone)]
pub(crate) struct PlexusEngineError(pub(crate) ExplainError);

impl fmt::Display for PlexusEngineError {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match &self.0 {
            ExplainError::InvalidPlan(message)
            | ExplainError::SerializedPlanMalformed(message)
            | ExplainError::UnsupportedSerializedOperator(message) => f.write_str(message),
            ExplainError::UnsupportedSerializedPlanVersion {
                found_version,
                supported_major,
                max_supported_minor,
            } => write!(
                f,
                "unsupported serialized plan version {}; supported major={}, max minor={}",
                found_version, supported_major, max_supported_minor
            ),
        }
    }
}

impl std::error::Error for PlexusEngineError {}

impl From<PlexusEngineError> for ExplainError {
    fn from(value: PlexusEngineError) -> Self {
        value.0
    }
}

impl PlanEngine for PlexusIridiumEngine<'_> {
    type Error = PlexusEngineError;

    fn execute_plan(&mut self, plan: &Plan) -> std::result::Result<QueryResult, Self::Error> {
        let stream = self.execute_plan_stream(plan).map_err(PlexusEngineError)?;
        Ok(row_stream_to_query_result(stream))
    }
}

fn row_stream_to_query_result(stream: RowStream) -> QueryResult {
    QueryResult {
        rows: stream
            .rows
            .into_iter()
            .map(|row| vec![Value::Int(row.node_id as i64)])
            .collect(),
    }
}

#[allow(dead_code)]
fn query_result_to_row_stream(query_result: QueryResult) -> Result<RowStream> {
    let mut rows = Vec::with_capacity(query_result.rows.len());
    for value_row in query_result.rows {
        let node_id = match value_row.first() {
            Some(Value::Int(value)) if *value >= 0 => *value as u64,
            other => {
                return Err(ExplainError::SerializedPlanMalformed(format!(
                    "plexus engine produced unsupported row shape: {:?}",
                    other
                )))
            }
        };
        rows.push(Row {
            node_id,
            has_full: false,
            delta_count: 0,
            adjacency_degree: 0,
            score: None,
            aggregate_value: None,
        });
    }
    let scanned_nodes = rows.len() as u64;
    Ok(RowStream {
        rows,
        scanned_nodes,
        latency_micros: 0,
        morsels_processed: 0,
        rerank_batches: 0,
        parallel_workers: 1,
    })
}

pub fn execute_in_mode(
    mode: ExecutionMode,
    native_plan: Option<&ExplainPlan>,
    serialized_plan_bytes: Option<&[u8]>,
    params: &ExecuteParams,
    handle: &mut storage_api::StorageHandle,
) -> Result<RowStream> {
    match mode {
        ExecutionMode::Native => {
            let Some(plan) = native_plan else {
                return Err(ExplainError::InvalidPlan(
                    "native execution mode requires a native explain plan".to_string(),
                ));
            };
            execution::execute(plan, params, handle)
        }
        ExecutionMode::Plexus => {
            let Some(bytes) = serialized_plan_bytes else {
                return Err(ExplainError::SerializedPlanMalformed(
                    "plexus execution mode requires serialized plan bytes".to_string(),
                ));
            };
            execute_serialized_plan(bytes, params, handle)
        }
    }
}