plexus-engine 0.3.6

Engine integration traits for consuming Plexus plans
Documentation
use std::cmp::Ordering;
use std::collections::{BTreeMap, HashMap, HashSet};

pub use capabilities::{
    check_version_compat, op_ordering_contract, required_capabilities,
    validate_plan_against_capabilities, CapabilityError, EngineCapabilities,
    EngineCapabilityDocument, ExprKind, OpKind, OpOrderingContract, OpOrderingDocument, PlanSemver,
    RequiredCapabilities, VersionRange, ALL_EXPR_KINDS, ALL_OP_KINDS,
};
pub use embedded_profile::{
    assess_embedded_graph_rag_capabilities, assess_embedded_graph_rag_plan,
    validate_embedded_graph_rag_plan, EmbeddedGraphRagAssessment, EmbeddedGraphRagCapabilityGap,
    EmbeddedGraphRagProfileError, EMBEDDED_GRAPH_RAG_PROFILE, EMBEDDED_PHASE_1_RELEASE_CANDIDATE,
    PLEXUS_CAPABILITY_REJECTION_SCENARIO,
};
pub use independent_consumer::{proof_fixture_graph, IndependentConsumerEngine};
use plexus_serde::{
    deserialize_plan, validate_plan_structure, AggFn, CmpOp, ExpandDir, Expr, Op, Plan, SortDir,
};
pub use reference_topology::{
    build_compiled_plan_cache_descriptor, flagship_graph_rag_reference_contract,
    CompiledPlanCacheContract, CompiledPlanCacheDescriptor, CompiledPlanCacheRequest,
    FlagshipGraphRagReferenceContract, COMPILED_PLAN_CACHE_NAMESPACE, COMPILED_PLAN_CACHE_SCHEMA,
    FLAGSHIP_COMPATIBILITY_PROFILE, FLAGSHIP_GRAPH_RAG_REFERENCE_TOPOLOGY,
    FLAGSHIP_PHASE_3_RELEASE_CANDIDATE, RHODIUM_COMPILED_PLAN_CACHE_ROLE,
};

#[derive(Debug, Clone, PartialEq)]
pub enum Value {
    Null,
    Bool(bool),
    Int(i64),
    Float(f64),
    String(String),
    NodeRef(u64),
    RelRef(u64),
    List(Vec<Value>),
    Map(BTreeMap<String, Value>),
}

pub type Row = Vec<Value>;

type RowSet = Vec<Row>;

#[derive(Debug, Clone, PartialEq)]
pub struct QueryResult {
    pub rows: Vec<Row>,
    /// Engine-emitted cursor for the next page. Present only when the plan
    /// contained a `Limit` with `emit_cursor = true` and rows were truncated.
    /// The reference engine always sets this to `None`; production engines
    /// (e.g. Iridium) encode their internal resume state here.
    pub continuation: Option<Vec<u8>>,
}

impl QueryResult {
    pub fn empty() -> Self {
        Self {
            rows: Vec::new(),
            continuation: None,
        }
    }
}

#[derive(Debug, Clone, PartialEq)]
pub struct Node {
    pub id: u64,
    pub labels: HashSet<String>,
    pub props: HashMap<String, Value>,
}

#[derive(Debug, Clone, PartialEq)]
pub struct Relationship {
    pub id: u64,
    pub src: u64,
    pub dst: u64,
    pub typ: String,
    pub props: HashMap<String, Value>,
}

/// Simple in-memory graph representation for the reference engine.
///
/// **Not representative of production performance.** All lookups are O(n) linear
/// scans over `Vec`. Production engines should use indexed storage (e.g.,
/// `HashMap<u64, Node>`) or implement [`StorageAdapter`] to map Plexus ops onto
/// their native storage layer. See `docs/storage-engine-mapping.md`.
#[derive(Debug, Clone, PartialEq, Default)]
pub struct Graph {
    pub nodes: Vec<Node>,
    pub rels: Vec<Relationship>,
}

impl Graph {
    /// O(n) linear scan — production engines should use indexed lookups.
    pub fn node_by_id(&self, id: u64) -> Option<&Node> {
        self.nodes.iter().find(|n| n.id == id)
    }

    /// O(n) linear scan — production engines should use indexed lookups.
    pub fn node_by_id_mut(&mut self, id: u64) -> Option<&mut Node> {
        self.nodes.iter_mut().find(|n| n.id == id)
    }

    /// O(n) linear scan — production engines should use indexed lookups.
    pub fn rel_by_id(&self, id: u64) -> Option<&Relationship> {
        self.rels.iter().find(|r| r.id == id)
    }

    /// O(n) linear scan — production engines should use indexed lookups.
    pub fn rel_by_id_mut(&mut self, id: u64) -> Option<&mut Relationship> {
        self.rels.iter_mut().find(|r| r.id == id)
    }
}

#[derive(Debug, thiserror::Error)]
pub enum ExecutionError {
    #[error("invalid op reference index {0}")]
    InvalidOpRef(u32),
    #[error("missing op output at index {0}")]
    MissingOpOutput(u32),
    #[error("invalid root op index {0}")]
    InvalidRootOp(u32),
    #[error("row column {idx} out of bounds (len={len})")]
    ColumnOutOfBounds { idx: usize, len: usize },
    #[error("expected a boolean value")]
    ExpectedBool,
    #[error("expected a numeric value")]
    ExpectedNumeric,
    #[error("expected aggregate expression")]
    ExpectedAggregateExpr,
    #[error("sort keys/dirs length mismatch ({keys} vs {dirs})")]
    SortArityMismatch { keys: usize, dirs: usize },
    #[error("unknown node id {0}")]
    UnknownNode(u64),
    #[error("unknown relationship id {0}")]
    UnknownRel(u64),
    #[error("expected node reference in column {idx}")]
    ExpectedNodeRef { idx: usize },
    #[error("expected relationship reference in column {idx}")]
    ExpectedRelRef { idx: usize },
    #[error("expected node or relationship reference in column {idx}")]
    ExpectedEntityRef { idx: usize },
    #[error("cannot delete node {0} without detach while relationships exist")]
    DeleteRequiresDetach(u64),
    #[error("expected map or null for properties payload")]
    ExpectedMapPayload,
    #[error("unsupported op in reference engine: {0}")]
    UnsupportedOp(&'static str),
    #[error("unsupported expression in reference engine: {0}")]
    UnsupportedExpr(&'static str),
    #[error("unbound parameter: {0}")]
    UnboundParam(String),
    #[error("plan requires multi-graph support but engine declares supports_multi_graph=false")]
    MultiGraphUnsupported,
}

#[derive(Debug, thiserror::Error)]
pub enum EngineError<E: std::error::Error + 'static> {
    #[error("invalid serialized Plexus plan: {0}")]
    Deserialize(#[from] plexus_serde::SerdeError),
    #[error("invalid plan structure: {0}")]
    InvalidStructure(plexus_serde::PlanValidationError),
    #[error("engine execution failed: {0}")]
    Engine(E),
}

/// Engine-facing API for consuming validated/deserialized Plexus plans.
pub trait PlanEngine {
    type Error: std::error::Error + Send + Sync + 'static;

    /// Execute a deserialized plan.
    fn execute_plan(&mut self, plan: &Plan) -> Result<QueryResult, Self::Error>;
}

/// Optional mutation capability for engines that execute DML ops.
pub trait MutationEngine {
    type Error: std::error::Error + Send + Sync + 'static;

    fn create_node(
        &mut self,
        labels: &[String],
        props: HashMap<String, Value>,
    ) -> Result<u64, Self::Error>;
    fn create_rel(
        &mut self,
        src: u64,
        dst: u64,
        rel_type: &str,
        props: HashMap<String, Value>,
    ) -> Result<u64, Self::Error>;
    fn merge_pattern(
        &mut self,
        pattern: &Expr,
        on_create_props: &Expr,
        on_match_props: &Expr,
        schema: &[plexus_serde::ColDef],
        row: &Row,
    ) -> Result<(), Self::Error>;
    fn delete_entity(&mut self, target: &Value, detach: bool) -> Result<(), Self::Error>;
    fn set_property(&mut self, target: &Value, key: &str, value: Value) -> Result<(), Self::Error>;
    fn remove_property(&mut self, target: &Value, key: &str) -> Result<(), Self::Error>;
}

/// Deserialize, validate, and execute a serialized Plexus plan with the provided engine.
///
/// Validates plan structure (no cycles, valid references, valid root_op) before
/// execution. Returns the first structural error encountered, if any.
pub fn execute_serialized<E: PlanEngine>(
    engine: &mut E,
    bytes: &[u8],
) -> Result<QueryResult, EngineError<E::Error>> {
    let plan = deserialize_plan(bytes)?;
    if let Err(errors) = validate_plan_structure(&plan) {
        return Err(EngineError::InvalidStructure(
            errors.into_iter().next().unwrap(),
        ));
    }
    engine.execute_plan(&plan).map_err(EngineError::Engine)
}

/// Reference in-memory engine for integration testing and conformance validation.
///
/// This engine prioritizes correctness and readability over performance. It uses
/// O(n) linear scans for entity lookups and clones row sets between pipeline
/// stages. **Do not use as a template for production engine performance patterns.**
///
/// Production engines should implement [`PlanEngine`] (and optionally
/// [`MutationEngine`]) against their own storage, or use the [`StorageAdapter`]
/// trait for property-graph mapping. See `docs/engine-implementor-guide.md`.
#[derive(Debug, Clone, Default)]
pub struct InMemoryEngine {
    pub graph: Graph,
    pub params: HashMap<String, Value>,
}

impl InMemoryEngine {
    pub fn new(graph: Graph) -> Self {
        Self {
            graph,
            params: HashMap::new(),
        }
    }

    pub fn with_params(graph: Graph, params: HashMap<String, Value>) -> Self {
        Self { graph, params }
    }

    pub fn set_param(&mut self, name: impl Into<String>, value: Value) {
        self.params.insert(name.into(), value);
    }

    pub fn clear_params(&mut self) {
        self.params.clear();
    }
}

#[derive(Debug, Clone, PartialEq)]
pub struct VectorCollectionEntry {
    pub node_id: u64,
    pub embedding: Vec<f64>,
}

#[derive(Debug, Clone, Default)]
pub struct MockVectorEngine {
    pub base: InMemoryEngine,
    pub collections: HashMap<String, Vec<VectorCollectionEntry>>,
}

impl MockVectorEngine {
    pub fn new(graph: Graph) -> Self {
        Self {
            base: InMemoryEngine::new(graph),
            collections: HashMap::new(),
        }
    }

    pub fn with_collections(
        graph: Graph,
        collections: HashMap<String, Vec<VectorCollectionEntry>>,
    ) -> Self {
        Self {
            base: InMemoryEngine::new(graph),
            collections,
        }
    }

    pub fn insert_collection(
        &mut self,
        name: impl Into<String>,
        entries: Vec<VectorCollectionEntry>,
    ) {
        self.collections.insert(name.into(), entries);
    }

    pub fn set_param(&mut self, name: impl Into<String>, value: Value) {
        self.base.set_param(name, value);
    }

    pub fn clear_params(&mut self) {
        self.base.clear_params();
    }
}

mod capabilities;
mod embedded_profile;
mod independent_consumer;
mod reference_topology;
mod storage;
pub use storage::{EntityRef, StorageAdapter};
mod engine;

#[cfg(test)]
mod tests;