plexus-engine 0.3.6

Engine integration traits for consuming Plexus plans
Documentation
use serde::Serialize;
use sha2::{Digest, Sha256};

use crate::{EngineCapabilities, EMBEDDED_GRAPH_RAG_PROFILE, PLEXUS_CAPABILITY_REJECTION_SCENARIO};
use plexus_serde::deserialize_plan;

pub const FLAGSHIP_GRAPH_RAG_REFERENCE_TOPOLOGY: &str = "flagship-graphrag-reference-v0";
pub const FLAGSHIP_PHASE_3_RELEASE_CANDIDATE: &str = "phase-3-reference-topology";
pub const FLAGSHIP_COMPATIBILITY_PROFILE: &str = "core-read";
pub const COMPILED_PLAN_CACHE_SCHEMA: &str = "plexus.compiled-plan-cache.v1";
pub const RHODIUM_COMPILED_PLAN_CACHE_ROLE: &str = "rhodium-compiled-plan-cache";
pub const COMPILED_PLAN_CACHE_NAMESPACE: &str = "plexus/compiled-plan";

#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub struct CompiledPlanCacheContract {
    pub schema: String,
    pub cache_namespace: String,
    pub cache_role: String,
    pub identity_inputs: Vec<String>,
    pub invalidation_inputs: Vec<String>,
    pub key_template: String,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub struct FlagshipGraphRagReferenceContract {
    pub topology: String,
    pub release_candidate: String,
    pub serialized_plan_profile: String,
    pub compatibility_profile: String,
    pub capability_rejection_scenario: String,
    pub compiled_plan_cache: CompiledPlanCacheContract,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CompiledPlanCacheRequest<'a> {
    pub execution_engine: &'a str,
    pub pipeline: &'a str,
    pub declared_consumer: &'a str,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub struct CompiledPlanCacheDescriptor {
    pub schema: String,
    pub topology: String,
    pub serialized_plan_profile: String,
    pub compatibility_profile: String,
    pub execution_engine: String,
    pub pipeline: String,
    pub declared_consumer: String,
    pub plan_format_major: u32,
    pub plan_format_minor: u32,
    pub plan_fingerprint_sha256: String,
    pub capability_fingerprint_sha256: String,
    pub cache_namespace: String,
    pub cache_key: String,
    pub invalidation_inputs: Vec<String>,
}

pub fn flagship_graph_rag_reference_contract() -> FlagshipGraphRagReferenceContract {
    FlagshipGraphRagReferenceContract {
        topology: FLAGSHIP_GRAPH_RAG_REFERENCE_TOPOLOGY.to_string(),
        release_candidate: FLAGSHIP_PHASE_3_RELEASE_CANDIDATE.to_string(),
        serialized_plan_profile: EMBEDDED_GRAPH_RAG_PROFILE.to_string(),
        compatibility_profile: FLAGSHIP_COMPATIBILITY_PROFILE.to_string(),
        capability_rejection_scenario: PLEXUS_CAPABILITY_REJECTION_SCENARIO.to_string(),
        compiled_plan_cache: CompiledPlanCacheContract {
            schema: COMPILED_PLAN_CACHE_SCHEMA.to_string(),
            cache_namespace: COMPILED_PLAN_CACHE_NAMESPACE.to_string(),
            cache_role: RHODIUM_COMPILED_PLAN_CACHE_ROLE.to_string(),
            identity_inputs: vec![
                "topology".to_string(),
                "serialized_plan_profile".to_string(),
                "compatibility_profile".to_string(),
                "plan_format".to_string(),
                "execution_engine".to_string(),
                "pipeline".to_string(),
                "declared_consumer".to_string(),
                "plan_fingerprint_sha256".to_string(),
                "capability_fingerprint_sha256".to_string(),
            ],
            invalidation_inputs: vec![
                "serialized plan bytes".to_string(),
                "selected execution pipeline".to_string(),
                "declared consumer label".to_string(),
                "declared capability document".to_string(),
                "topology contract revision".to_string(),
                "Plexus plan-format major/minor".to_string(),
            ],
            key_template: format!(
                "{}/{}/{}/{{pipeline}}/{{execution_engine}}/{{declared_consumer}}/{{plan_sha256}}/{{capability_sha256}}/v{{major}}.{{minor}}",
                COMPILED_PLAN_CACHE_NAMESPACE,
                FLAGSHIP_GRAPH_RAG_REFERENCE_TOPOLOGY,
                EMBEDDED_GRAPH_RAG_PROFILE
            ),
        },
    }
}

pub fn build_compiled_plan_cache_descriptor(
    plan_bytes: &[u8],
    capabilities: &EngineCapabilities,
    request: CompiledPlanCacheRequest<'_>,
) -> Result<CompiledPlanCacheDescriptor, plexus_serde::SerdeError> {
    let plan = deserialize_plan(plan_bytes)?;
    let plan_fingerprint_sha256 = hex_sha256(plan_bytes);
    let capability_fingerprint_sha256 = hex_sha256(
        serde_json::to_vec(&capabilities.to_document())
            .expect("serializing EngineCapabilityDocument should not fail")
            .as_slice(),
    );
    let declared_consumer = sanitize_segment(request.declared_consumer);
    let execution_engine = sanitize_segment(request.execution_engine);
    let pipeline = sanitize_segment(request.pipeline);
    let cache_key = format!(
        "{}/{}/{}/{}/{}/{}/{}/{}/v{}.{}",
        COMPILED_PLAN_CACHE_NAMESPACE,
        FLAGSHIP_GRAPH_RAG_REFERENCE_TOPOLOGY,
        EMBEDDED_GRAPH_RAG_PROFILE,
        pipeline,
        execution_engine,
        declared_consumer,
        plan_fingerprint_sha256,
        capability_fingerprint_sha256,
        plan.version.major,
        plan.version.minor,
    );

    Ok(CompiledPlanCacheDescriptor {
        schema: COMPILED_PLAN_CACHE_SCHEMA.to_string(),
        topology: FLAGSHIP_GRAPH_RAG_REFERENCE_TOPOLOGY.to_string(),
        serialized_plan_profile: EMBEDDED_GRAPH_RAG_PROFILE.to_string(),
        compatibility_profile: FLAGSHIP_COMPATIBILITY_PROFILE.to_string(),
        execution_engine,
        pipeline,
        declared_consumer,
        plan_format_major: plan.version.major,
        plan_format_minor: plan.version.minor,
        plan_fingerprint_sha256,
        capability_fingerprint_sha256,
        cache_namespace: COMPILED_PLAN_CACHE_NAMESPACE.to_string(),
        cache_key,
        invalidation_inputs: flagship_graph_rag_reference_contract()
            .compiled_plan_cache
            .invalidation_inputs,
    })
}

fn sanitize_segment(value: &str) -> String {
    let mut out = String::with_capacity(value.len());
    for ch in value.chars() {
        if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' {
            out.push(ch);
        } else {
            out.push('-');
        }
    }
    if out.is_empty() {
        "unknown".to_string()
    } else {
        out
    }
}

fn hex_sha256(bytes: &[u8]) -> String {
    let digest = Sha256::digest(bytes);
    let mut out = String::with_capacity(digest.len() * 2);
    for byte in digest {
        out.push_str(&format!("{byte:02x}"));
    }
    out
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::{
        EngineCapabilities, ExprKind, OpKind, PlanSemver, VersionRange, EMBEDDED_GRAPH_RAG_PROFILE,
    };
    use plexus_serde::{
        serialize_plan, Expr, Op, Plan, VectorMetric, Version, PLAN_FORMAT_MAJOR, PLAN_FORMAT_MINOR,
    };
    use std::collections::BTreeSet;

    fn sample_capabilities() -> EngineCapabilities {
        EngineCapabilities {
            version_range: VersionRange::new(
                PlanSemver::new(PLAN_FORMAT_MAJOR, 0, 0),
                PlanSemver::new(PLAN_FORMAT_MAJOR, PLAN_FORMAT_MINOR, u32::MAX),
            ),
            supported_ops: BTreeSet::from([
                OpKind::ConstRow,
                OpKind::VectorScan,
                OpKind::Rerank,
                OpKind::Return,
            ]),
            supported_exprs: BTreeSet::from([ExprKind::ColRef, ExprKind::VectorSimilarity]),
            supports_graph_ref: false,
            supports_multi_graph: false,
            supports_graph_params: false,
        }
    }

    fn sample_plan_bytes() -> Vec<u8> {
        let plan = Plan {
            version: Version {
                major: PLAN_FORMAT_MAJOR,
                minor: PLAN_FORMAT_MINOR,
                patch: 0,
                producer: "plexus-engine-reference-topology-test".to_string(),
            },
            ops: vec![
                Op::ConstRow,
                Op::VectorScan {
                    input: 0,
                    collection: "docs".to_string(),
                    query_vector: Expr::ColRef { idx: 0 },
                    metric: VectorMetric::Cosine,
                    top_k: 5,
                    approx_hint: false,
                    schema: Vec::new(),
                },
                Op::Rerank {
                    input: 1,
                    score_expr: Expr::VectorSimilarity {
                        metric: VectorMetric::Cosine,
                        lhs: Box::new(Expr::ColRef { idx: 1 }),
                        rhs: Box::new(Expr::ColRef { idx: 2 }),
                    },
                    top_k: 3,
                    schema: Vec::new(),
                },
                Op::Return { input: 2 },
            ],
            root_op: 3,
        };
        serialize_plan(&plan).expect("serialize plan")
    }

    #[test]
    fn flagship_contract_reuses_embedded_profile_and_core_read_baseline() {
        let contract = flagship_graph_rag_reference_contract();
        assert_eq!(contract.serialized_plan_profile, EMBEDDED_GRAPH_RAG_PROFILE);
        assert_eq!(
            contract.compatibility_profile,
            FLAGSHIP_COMPATIBILITY_PROFILE
        );
        assert_eq!(
            contract.compiled_plan_cache.schema,
            COMPILED_PLAN_CACHE_SCHEMA
        );
    }

    #[test]
    fn compiled_plan_cache_descriptor_is_deterministic() {
        let capabilities = sample_capabilities();
        let plan_bytes = sample_plan_bytes();
        let request = CompiledPlanCacheRequest {
            execution_engine: "reference",
            pipeline: "optimized",
            declared_consumer: "iridium-runtime",
        };

        let first =
            build_compiled_plan_cache_descriptor(&plan_bytes, &capabilities, request.clone())
                .expect("first descriptor");
        let second = build_compiled_plan_cache_descriptor(&plan_bytes, &capabilities, request)
            .expect("second descriptor");

        assert_eq!(first, second);
        assert!(first.cache_key.starts_with(COMPILED_PLAN_CACHE_NAMESPACE));
        assert_eq!(first.serialized_plan_profile, EMBEDDED_GRAPH_RAG_PROFILE);
    }

    #[test]
    fn compiled_plan_cache_descriptor_sanitizes_segments() {
        let descriptor = build_compiled_plan_cache_descriptor(
            &sample_plan_bytes(),
            &sample_capabilities(),
            CompiledPlanCacheRequest {
                execution_engine: "reference engine",
                pipeline: "optimized/profile",
                declared_consumer: "iridium runtime",
            },
        )
        .expect("descriptor");

        assert_eq!(descriptor.execution_engine, "reference-engine");
        assert_eq!(descriptor.pipeline, "optimized-profile");
        assert_eq!(descriptor.declared_consumer, "iridium-runtime");
    }
}