Skip to main content

plexus_engine/
reference_topology.rs

1use serde::Serialize;
2use sha2::{Digest, Sha256};
3
4use crate::{EngineCapabilities, EMBEDDED_GRAPH_RAG_PROFILE, PLEXUS_CAPABILITY_REJECTION_SCENARIO};
5use plexus_serde::deserialize_plan;
6
7pub const FLAGSHIP_GRAPH_RAG_REFERENCE_TOPOLOGY: &str = "flagship-graphrag-reference-v0";
8pub const FLAGSHIP_PHASE_3_RELEASE_CANDIDATE: &str = "phase-3-reference-topology";
9pub const FLAGSHIP_COMPATIBILITY_PROFILE: &str = "core-read";
10pub const COMPILED_PLAN_CACHE_SCHEMA: &str = "plexus.compiled-plan-cache.v1";
11pub const RHODIUM_COMPILED_PLAN_CACHE_ROLE: &str = "rhodium-compiled-plan-cache";
12pub const COMPILED_PLAN_CACHE_NAMESPACE: &str = "plexus/compiled-plan";
13
14#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
15pub struct CompiledPlanCacheContract {
16    pub schema: String,
17    pub cache_namespace: String,
18    pub cache_role: String,
19    pub identity_inputs: Vec<String>,
20    pub invalidation_inputs: Vec<String>,
21    pub key_template: String,
22}
23
24#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
25pub struct FlagshipGraphRagReferenceContract {
26    pub topology: String,
27    pub release_candidate: String,
28    pub serialized_plan_profile: String,
29    pub compatibility_profile: String,
30    pub capability_rejection_scenario: String,
31    pub compiled_plan_cache: CompiledPlanCacheContract,
32}
33
34#[derive(Debug, Clone, PartialEq, Eq)]
35pub struct CompiledPlanCacheRequest<'a> {
36    pub execution_engine: &'a str,
37    pub pipeline: &'a str,
38    pub declared_consumer: &'a str,
39}
40
41#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
42pub struct CompiledPlanCacheDescriptor {
43    pub schema: String,
44    pub topology: String,
45    pub serialized_plan_profile: String,
46    pub compatibility_profile: String,
47    pub execution_engine: String,
48    pub pipeline: String,
49    pub declared_consumer: String,
50    pub plan_format_major: u32,
51    pub plan_format_minor: u32,
52    pub plan_fingerprint_sha256: String,
53    pub capability_fingerprint_sha256: String,
54    pub cache_namespace: String,
55    pub cache_key: String,
56    pub invalidation_inputs: Vec<String>,
57}
58
59pub fn flagship_graph_rag_reference_contract() -> FlagshipGraphRagReferenceContract {
60    FlagshipGraphRagReferenceContract {
61        topology: FLAGSHIP_GRAPH_RAG_REFERENCE_TOPOLOGY.to_string(),
62        release_candidate: FLAGSHIP_PHASE_3_RELEASE_CANDIDATE.to_string(),
63        serialized_plan_profile: EMBEDDED_GRAPH_RAG_PROFILE.to_string(),
64        compatibility_profile: FLAGSHIP_COMPATIBILITY_PROFILE.to_string(),
65        capability_rejection_scenario: PLEXUS_CAPABILITY_REJECTION_SCENARIO.to_string(),
66        compiled_plan_cache: CompiledPlanCacheContract {
67            schema: COMPILED_PLAN_CACHE_SCHEMA.to_string(),
68            cache_namespace: COMPILED_PLAN_CACHE_NAMESPACE.to_string(),
69            cache_role: RHODIUM_COMPILED_PLAN_CACHE_ROLE.to_string(),
70            identity_inputs: vec![
71                "topology".to_string(),
72                "serialized_plan_profile".to_string(),
73                "compatibility_profile".to_string(),
74                "plan_format".to_string(),
75                "execution_engine".to_string(),
76                "pipeline".to_string(),
77                "declared_consumer".to_string(),
78                "plan_fingerprint_sha256".to_string(),
79                "capability_fingerprint_sha256".to_string(),
80            ],
81            invalidation_inputs: vec![
82                "serialized plan bytes".to_string(),
83                "selected execution pipeline".to_string(),
84                "declared consumer label".to_string(),
85                "declared capability document".to_string(),
86                "topology contract revision".to_string(),
87                "Plexus plan-format major/minor".to_string(),
88            ],
89            key_template: format!(
90                "{}/{}/{}/{{pipeline}}/{{execution_engine}}/{{declared_consumer}}/{{plan_sha256}}/{{capability_sha256}}/v{{major}}.{{minor}}",
91                COMPILED_PLAN_CACHE_NAMESPACE,
92                FLAGSHIP_GRAPH_RAG_REFERENCE_TOPOLOGY,
93                EMBEDDED_GRAPH_RAG_PROFILE
94            ),
95        },
96    }
97}
98
99pub fn build_compiled_plan_cache_descriptor(
100    plan_bytes: &[u8],
101    capabilities: &EngineCapabilities,
102    request: CompiledPlanCacheRequest<'_>,
103) -> Result<CompiledPlanCacheDescriptor, plexus_serde::SerdeError> {
104    let plan = deserialize_plan(plan_bytes)?;
105    let plan_fingerprint_sha256 = hex_sha256(plan_bytes);
106    let capability_fingerprint_sha256 = hex_sha256(
107        serde_json::to_vec(&capabilities.to_document())
108            .expect("serializing EngineCapabilityDocument should not fail")
109            .as_slice(),
110    );
111    let declared_consumer = sanitize_segment(request.declared_consumer);
112    let execution_engine = sanitize_segment(request.execution_engine);
113    let pipeline = sanitize_segment(request.pipeline);
114    let cache_key = format!(
115        "{}/{}/{}/{}/{}/{}/{}/{}/v{}.{}",
116        COMPILED_PLAN_CACHE_NAMESPACE,
117        FLAGSHIP_GRAPH_RAG_REFERENCE_TOPOLOGY,
118        EMBEDDED_GRAPH_RAG_PROFILE,
119        pipeline,
120        execution_engine,
121        declared_consumer,
122        plan_fingerprint_sha256,
123        capability_fingerprint_sha256,
124        plan.version.major,
125        plan.version.minor,
126    );
127
128    Ok(CompiledPlanCacheDescriptor {
129        schema: COMPILED_PLAN_CACHE_SCHEMA.to_string(),
130        topology: FLAGSHIP_GRAPH_RAG_REFERENCE_TOPOLOGY.to_string(),
131        serialized_plan_profile: EMBEDDED_GRAPH_RAG_PROFILE.to_string(),
132        compatibility_profile: FLAGSHIP_COMPATIBILITY_PROFILE.to_string(),
133        execution_engine,
134        pipeline,
135        declared_consumer,
136        plan_format_major: plan.version.major,
137        plan_format_minor: plan.version.minor,
138        plan_fingerprint_sha256,
139        capability_fingerprint_sha256,
140        cache_namespace: COMPILED_PLAN_CACHE_NAMESPACE.to_string(),
141        cache_key,
142        invalidation_inputs: flagship_graph_rag_reference_contract()
143            .compiled_plan_cache
144            .invalidation_inputs,
145    })
146}
147
148fn sanitize_segment(value: &str) -> String {
149    let mut out = String::with_capacity(value.len());
150    for ch in value.chars() {
151        if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' {
152            out.push(ch);
153        } else {
154            out.push('-');
155        }
156    }
157    if out.is_empty() {
158        "unknown".to_string()
159    } else {
160        out
161    }
162}
163
164fn hex_sha256(bytes: &[u8]) -> String {
165    let digest = Sha256::digest(bytes);
166    let mut out = String::with_capacity(digest.len() * 2);
167    for byte in digest {
168        out.push_str(&format!("{byte:02x}"));
169    }
170    out
171}
172
173#[cfg(test)]
174mod tests {
175    use super::*;
176    use crate::{
177        EngineCapabilities, ExprKind, OpKind, PlanSemver, VersionRange, EMBEDDED_GRAPH_RAG_PROFILE,
178    };
179    use plexus_serde::{
180        serialize_plan, Expr, Op, Plan, VectorMetric, Version, PLAN_FORMAT_MAJOR, PLAN_FORMAT_MINOR,
181    };
182    use std::collections::BTreeSet;
183
184    fn sample_capabilities() -> EngineCapabilities {
185        EngineCapabilities {
186            version_range: VersionRange::new(
187                PlanSemver::new(PLAN_FORMAT_MAJOR, 0, 0),
188                PlanSemver::new(PLAN_FORMAT_MAJOR, PLAN_FORMAT_MINOR, u32::MAX),
189            ),
190            supported_ops: BTreeSet::from([
191                OpKind::ConstRow,
192                OpKind::VectorScan,
193                OpKind::Rerank,
194                OpKind::Return,
195            ]),
196            supported_exprs: BTreeSet::from([ExprKind::ColRef, ExprKind::VectorSimilarity]),
197            supports_graph_ref: false,
198            supports_multi_graph: false,
199            supports_graph_params: false,
200        }
201    }
202
203    fn sample_plan_bytes() -> Vec<u8> {
204        let plan = Plan {
205            version: Version {
206                major: PLAN_FORMAT_MAJOR,
207                minor: PLAN_FORMAT_MINOR,
208                patch: 0,
209                producer: "plexus-engine-reference-topology-test".to_string(),
210            },
211            ops: vec![
212                Op::ConstRow,
213                Op::VectorScan {
214                    input: 0,
215                    collection: "docs".to_string(),
216                    query_vector: Expr::ColRef { idx: 0 },
217                    metric: VectorMetric::Cosine,
218                    top_k: 5,
219                    approx_hint: false,
220                    schema: Vec::new(),
221                },
222                Op::Rerank {
223                    input: 1,
224                    score_expr: Expr::VectorSimilarity {
225                        metric: VectorMetric::Cosine,
226                        lhs: Box::new(Expr::ColRef { idx: 1 }),
227                        rhs: Box::new(Expr::ColRef { idx: 2 }),
228                    },
229                    top_k: 3,
230                    schema: Vec::new(),
231                },
232                Op::Return { input: 2 },
233            ],
234            root_op: 3,
235        };
236        serialize_plan(&plan).expect("serialize plan")
237    }
238
239    #[test]
240    fn flagship_contract_reuses_embedded_profile_and_core_read_baseline() {
241        let contract = flagship_graph_rag_reference_contract();
242        assert_eq!(contract.serialized_plan_profile, EMBEDDED_GRAPH_RAG_PROFILE);
243        assert_eq!(
244            contract.compatibility_profile,
245            FLAGSHIP_COMPATIBILITY_PROFILE
246        );
247        assert_eq!(
248            contract.compiled_plan_cache.schema,
249            COMPILED_PLAN_CACHE_SCHEMA
250        );
251    }
252
253    #[test]
254    fn compiled_plan_cache_descriptor_is_deterministic() {
255        let capabilities = sample_capabilities();
256        let plan_bytes = sample_plan_bytes();
257        let request = CompiledPlanCacheRequest {
258            execution_engine: "reference",
259            pipeline: "optimized",
260            declared_consumer: "iridium-runtime",
261        };
262
263        let first =
264            build_compiled_plan_cache_descriptor(&plan_bytes, &capabilities, request.clone())
265                .expect("first descriptor");
266        let second = build_compiled_plan_cache_descriptor(&plan_bytes, &capabilities, request)
267            .expect("second descriptor");
268
269        assert_eq!(first, second);
270        assert!(first.cache_key.starts_with(COMPILED_PLAN_CACHE_NAMESPACE));
271        assert_eq!(first.serialized_plan_profile, EMBEDDED_GRAPH_RAG_PROFILE);
272    }
273
274    #[test]
275    fn compiled_plan_cache_descriptor_sanitizes_segments() {
276        let descriptor = build_compiled_plan_cache_descriptor(
277            &sample_plan_bytes(),
278            &sample_capabilities(),
279            CompiledPlanCacheRequest {
280                execution_engine: "reference engine",
281                pipeline: "optimized/profile",
282                declared_consumer: "iridium runtime",
283            },
284        )
285        .expect("descriptor");
286
287        assert_eq!(descriptor.execution_engine, "reference-engine");
288        assert_eq!(descriptor.pipeline, "optimized-profile");
289        assert_eq!(descriptor.declared_consumer, "iridium-runtime");
290    }
291}