use serde::Serialize;
use sha2::{Digest, Sha256};
use crate::{EngineCapabilities, EMBEDDED_GRAPH_RAG_PROFILE, PLEXUS_CAPABILITY_REJECTION_SCENARIO};
use plexus_serde::deserialize_plan;
pub const FLAGSHIP_GRAPH_RAG_REFERENCE_TOPOLOGY: &str = "flagship-graphrag-reference-v0";
pub const FLAGSHIP_PHASE_3_RELEASE_CANDIDATE: &str = "phase-3-reference-topology";
pub const FLAGSHIP_COMPATIBILITY_PROFILE: &str = "core-read";
pub const COMPILED_PLAN_CACHE_SCHEMA: &str = "plexus.compiled-plan-cache.v1";
pub const RHODIUM_COMPILED_PLAN_CACHE_ROLE: &str = "rhodium-compiled-plan-cache";
pub const COMPILED_PLAN_CACHE_NAMESPACE: &str = "plexus/compiled-plan";
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub struct CompiledPlanCacheContract {
pub schema: String,
pub cache_namespace: String,
pub cache_role: String,
pub identity_inputs: Vec<String>,
pub invalidation_inputs: Vec<String>,
pub key_template: String,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub struct FlagshipGraphRagReferenceContract {
pub topology: String,
pub release_candidate: String,
pub serialized_plan_profile: String,
pub compatibility_profile: String,
pub capability_rejection_scenario: String,
pub compiled_plan_cache: CompiledPlanCacheContract,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CompiledPlanCacheRequest<'a> {
pub execution_engine: &'a str,
pub pipeline: &'a str,
pub declared_consumer: &'a str,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub struct CompiledPlanCacheDescriptor {
pub schema: String,
pub topology: String,
pub serialized_plan_profile: String,
pub compatibility_profile: String,
pub execution_engine: String,
pub pipeline: String,
pub declared_consumer: String,
pub plan_format_major: u32,
pub plan_format_minor: u32,
pub plan_fingerprint_sha256: String,
pub capability_fingerprint_sha256: String,
pub cache_namespace: String,
pub cache_key: String,
pub invalidation_inputs: Vec<String>,
}
pub fn flagship_graph_rag_reference_contract() -> FlagshipGraphRagReferenceContract {
FlagshipGraphRagReferenceContract {
topology: FLAGSHIP_GRAPH_RAG_REFERENCE_TOPOLOGY.to_string(),
release_candidate: FLAGSHIP_PHASE_3_RELEASE_CANDIDATE.to_string(),
serialized_plan_profile: EMBEDDED_GRAPH_RAG_PROFILE.to_string(),
compatibility_profile: FLAGSHIP_COMPATIBILITY_PROFILE.to_string(),
capability_rejection_scenario: PLEXUS_CAPABILITY_REJECTION_SCENARIO.to_string(),
compiled_plan_cache: CompiledPlanCacheContract {
schema: COMPILED_PLAN_CACHE_SCHEMA.to_string(),
cache_namespace: COMPILED_PLAN_CACHE_NAMESPACE.to_string(),
cache_role: RHODIUM_COMPILED_PLAN_CACHE_ROLE.to_string(),
identity_inputs: vec![
"topology".to_string(),
"serialized_plan_profile".to_string(),
"compatibility_profile".to_string(),
"plan_format".to_string(),
"execution_engine".to_string(),
"pipeline".to_string(),
"declared_consumer".to_string(),
"plan_fingerprint_sha256".to_string(),
"capability_fingerprint_sha256".to_string(),
],
invalidation_inputs: vec![
"serialized plan bytes".to_string(),
"selected execution pipeline".to_string(),
"declared consumer label".to_string(),
"declared capability document".to_string(),
"topology contract revision".to_string(),
"Plexus plan-format major/minor".to_string(),
],
key_template: format!(
"{}/{}/{}/{{pipeline}}/{{execution_engine}}/{{declared_consumer}}/{{plan_sha256}}/{{capability_sha256}}/v{{major}}.{{minor}}",
COMPILED_PLAN_CACHE_NAMESPACE,
FLAGSHIP_GRAPH_RAG_REFERENCE_TOPOLOGY,
EMBEDDED_GRAPH_RAG_PROFILE
),
},
}
}
pub fn build_compiled_plan_cache_descriptor(
plan_bytes: &[u8],
capabilities: &EngineCapabilities,
request: CompiledPlanCacheRequest<'_>,
) -> Result<CompiledPlanCacheDescriptor, plexus_serde::SerdeError> {
let plan = deserialize_plan(plan_bytes)?;
let plan_fingerprint_sha256 = hex_sha256(plan_bytes);
let capability_fingerprint_sha256 = hex_sha256(
serde_json::to_vec(&capabilities.to_document())
.expect("serializing EngineCapabilityDocument should not fail")
.as_slice(),
);
let declared_consumer = sanitize_segment(request.declared_consumer);
let execution_engine = sanitize_segment(request.execution_engine);
let pipeline = sanitize_segment(request.pipeline);
let cache_key = format!(
"{}/{}/{}/{}/{}/{}/{}/{}/v{}.{}",
COMPILED_PLAN_CACHE_NAMESPACE,
FLAGSHIP_GRAPH_RAG_REFERENCE_TOPOLOGY,
EMBEDDED_GRAPH_RAG_PROFILE,
pipeline,
execution_engine,
declared_consumer,
plan_fingerprint_sha256,
capability_fingerprint_sha256,
plan.version.major,
plan.version.minor,
);
Ok(CompiledPlanCacheDescriptor {
schema: COMPILED_PLAN_CACHE_SCHEMA.to_string(),
topology: FLAGSHIP_GRAPH_RAG_REFERENCE_TOPOLOGY.to_string(),
serialized_plan_profile: EMBEDDED_GRAPH_RAG_PROFILE.to_string(),
compatibility_profile: FLAGSHIP_COMPATIBILITY_PROFILE.to_string(),
execution_engine,
pipeline,
declared_consumer,
plan_format_major: plan.version.major,
plan_format_minor: plan.version.minor,
plan_fingerprint_sha256,
capability_fingerprint_sha256,
cache_namespace: COMPILED_PLAN_CACHE_NAMESPACE.to_string(),
cache_key,
invalidation_inputs: flagship_graph_rag_reference_contract()
.compiled_plan_cache
.invalidation_inputs,
})
}
fn sanitize_segment(value: &str) -> String {
let mut out = String::with_capacity(value.len());
for ch in value.chars() {
if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' {
out.push(ch);
} else {
out.push('-');
}
}
if out.is_empty() {
"unknown".to_string()
} else {
out
}
}
fn hex_sha256(bytes: &[u8]) -> String {
let digest = Sha256::digest(bytes);
let mut out = String::with_capacity(digest.len() * 2);
for byte in digest {
out.push_str(&format!("{byte:02x}"));
}
out
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
EngineCapabilities, ExprKind, OpKind, PlanSemver, VersionRange, EMBEDDED_GRAPH_RAG_PROFILE,
};
use plexus_serde::{
serialize_plan, Expr, Op, Plan, VectorMetric, Version, PLAN_FORMAT_MAJOR, PLAN_FORMAT_MINOR,
};
use std::collections::BTreeSet;
fn sample_capabilities() -> EngineCapabilities {
EngineCapabilities {
version_range: VersionRange::new(
PlanSemver::new(PLAN_FORMAT_MAJOR, 0, 0),
PlanSemver::new(PLAN_FORMAT_MAJOR, PLAN_FORMAT_MINOR, u32::MAX),
),
supported_ops: BTreeSet::from([
OpKind::ConstRow,
OpKind::VectorScan,
OpKind::Rerank,
OpKind::Return,
]),
supported_exprs: BTreeSet::from([ExprKind::ColRef, ExprKind::VectorSimilarity]),
supports_graph_ref: false,
supports_multi_graph: false,
supports_graph_params: false,
}
}
fn sample_plan_bytes() -> Vec<u8> {
let plan = Plan {
version: Version {
major: PLAN_FORMAT_MAJOR,
minor: PLAN_FORMAT_MINOR,
patch: 0,
producer: "plexus-engine-reference-topology-test".to_string(),
},
ops: vec![
Op::ConstRow,
Op::VectorScan {
input: 0,
collection: "docs".to_string(),
query_vector: Expr::ColRef { idx: 0 },
metric: VectorMetric::Cosine,
top_k: 5,
approx_hint: false,
schema: Vec::new(),
},
Op::Rerank {
input: 1,
score_expr: Expr::VectorSimilarity {
metric: VectorMetric::Cosine,
lhs: Box::new(Expr::ColRef { idx: 1 }),
rhs: Box::new(Expr::ColRef { idx: 2 }),
},
top_k: 3,
schema: Vec::new(),
},
Op::Return { input: 2 },
],
root_op: 3,
};
serialize_plan(&plan).expect("serialize plan")
}
#[test]
fn flagship_contract_reuses_embedded_profile_and_core_read_baseline() {
let contract = flagship_graph_rag_reference_contract();
assert_eq!(contract.serialized_plan_profile, EMBEDDED_GRAPH_RAG_PROFILE);
assert_eq!(
contract.compatibility_profile,
FLAGSHIP_COMPATIBILITY_PROFILE
);
assert_eq!(
contract.compiled_plan_cache.schema,
COMPILED_PLAN_CACHE_SCHEMA
);
}
#[test]
fn compiled_plan_cache_descriptor_is_deterministic() {
let capabilities = sample_capabilities();
let plan_bytes = sample_plan_bytes();
let request = CompiledPlanCacheRequest {
execution_engine: "reference",
pipeline: "optimized",
declared_consumer: "iridium-runtime",
};
let first =
build_compiled_plan_cache_descriptor(&plan_bytes, &capabilities, request.clone())
.expect("first descriptor");
let second = build_compiled_plan_cache_descriptor(&plan_bytes, &capabilities, request)
.expect("second descriptor");
assert_eq!(first, second);
assert!(first.cache_key.starts_with(COMPILED_PLAN_CACHE_NAMESPACE));
assert_eq!(first.serialized_plan_profile, EMBEDDED_GRAPH_RAG_PROFILE);
}
#[test]
fn compiled_plan_cache_descriptor_sanitizes_segments() {
let descriptor = build_compiled_plan_cache_descriptor(
&sample_plan_bytes(),
&sample_capabilities(),
CompiledPlanCacheRequest {
execution_engine: "reference engine",
pipeline: "optimized/profile",
declared_consumer: "iridium runtime",
},
)
.expect("descriptor");
assert_eq!(descriptor.execution_engine, "reference-engine");
assert_eq!(descriptor.pipeline, "optimized-profile");
assert_eq!(descriptor.declared_consumer, "iridium-runtime");
}
}