Skip to main content

plexus_engine/
lib.rs

1use std::cmp::Ordering;
2use std::collections::{BTreeMap, HashMap, HashSet};
3
4pub use capabilities::{
5    check_version_compat, op_ordering_contract, required_capabilities,
6    validate_plan_against_capabilities, CapabilityError, EngineCapabilities,
7    EngineCapabilityDocument, ExprKind, OpKind, OpOrderingContract, OpOrderingDocument, PlanSemver,
8    RequiredCapabilities, VersionRange, ALL_EXPR_KINDS, ALL_OP_KINDS,
9};
10use plexus_serde::{
11    deserialize_plan, validate_plan_structure, AggFn, CmpOp, ExpandDir, Expr, Op, Plan, SortDir,
12};
13
14#[derive(Debug, Clone, PartialEq)]
15pub enum Value {
16    Null,
17    Bool(bool),
18    Int(i64),
19    Float(f64),
20    String(String),
21    NodeRef(u64),
22    RelRef(u64),
23    List(Vec<Value>),
24    Map(BTreeMap<String, Value>),
25}
26
27pub type Row = Vec<Value>;
28
29type RowSet = Vec<Row>;
30
31#[derive(Debug, Clone, PartialEq)]
32pub struct QueryResult {
33    pub rows: Vec<Row>,
34}
35
36impl QueryResult {
37    pub fn empty() -> Self {
38        Self { rows: Vec::new() }
39    }
40}
41
42#[derive(Debug, Clone, PartialEq)]
43pub struct Node {
44    pub id: u64,
45    pub labels: HashSet<String>,
46    pub props: HashMap<String, Value>,
47}
48
49#[derive(Debug, Clone, PartialEq)]
50pub struct Relationship {
51    pub id: u64,
52    pub src: u64,
53    pub dst: u64,
54    pub typ: String,
55    pub props: HashMap<String, Value>,
56}
57
58/// Simple in-memory graph representation for the reference engine.
59///
60/// **Not representative of production performance.** All lookups are O(n) linear
61/// scans over `Vec`. Production engines should use indexed storage (e.g.,
62/// `HashMap<u64, Node>`) or implement [`StorageAdapter`] to map Plexus ops onto
63/// their native storage layer. See `docs/storage-engine-mapping.md`.
64#[derive(Debug, Clone, PartialEq, Default)]
65pub struct Graph {
66    pub nodes: Vec<Node>,
67    pub rels: Vec<Relationship>,
68}
69
70impl Graph {
71    /// O(n) linear scan — production engines should use indexed lookups.
72    pub fn node_by_id(&self, id: u64) -> Option<&Node> {
73        self.nodes.iter().find(|n| n.id == id)
74    }
75
76    /// O(n) linear scan — production engines should use indexed lookups.
77    pub fn node_by_id_mut(&mut self, id: u64) -> Option<&mut Node> {
78        self.nodes.iter_mut().find(|n| n.id == id)
79    }
80
81    /// O(n) linear scan — production engines should use indexed lookups.
82    pub fn rel_by_id(&self, id: u64) -> Option<&Relationship> {
83        self.rels.iter().find(|r| r.id == id)
84    }
85
86    /// O(n) linear scan — production engines should use indexed lookups.
87    pub fn rel_by_id_mut(&mut self, id: u64) -> Option<&mut Relationship> {
88        self.rels.iter_mut().find(|r| r.id == id)
89    }
90}
91
92#[derive(Debug, thiserror::Error)]
93pub enum ExecutionError {
94    #[error("invalid op reference index {0}")]
95    InvalidOpRef(u32),
96    #[error("missing op output at index {0}")]
97    MissingOpOutput(u32),
98    #[error("invalid root op index {0}")]
99    InvalidRootOp(u32),
100    #[error("row column {idx} out of bounds (len={len})")]
101    ColumnOutOfBounds { idx: usize, len: usize },
102    #[error("expected a boolean value")]
103    ExpectedBool,
104    #[error("expected a numeric value")]
105    ExpectedNumeric,
106    #[error("expected aggregate expression")]
107    ExpectedAggregateExpr,
108    #[error("sort keys/dirs length mismatch ({keys} vs {dirs})")]
109    SortArityMismatch { keys: usize, dirs: usize },
110    #[error("unknown node id {0}")]
111    UnknownNode(u64),
112    #[error("unknown relationship id {0}")]
113    UnknownRel(u64),
114    #[error("expected node reference in column {idx}")]
115    ExpectedNodeRef { idx: usize },
116    #[error("expected relationship reference in column {idx}")]
117    ExpectedRelRef { idx: usize },
118    #[error("expected node or relationship reference in column {idx}")]
119    ExpectedEntityRef { idx: usize },
120    #[error("cannot delete node {0} without detach while relationships exist")]
121    DeleteRequiresDetach(u64),
122    #[error("expected map or null for properties payload")]
123    ExpectedMapPayload,
124    #[error("unsupported op in reference engine: {0}")]
125    UnsupportedOp(&'static str),
126    #[error("unsupported expression in reference engine: {0}")]
127    UnsupportedExpr(&'static str),
128    #[error("unbound parameter: {0}")]
129    UnboundParam(String),
130    #[error("plan requires multi-graph support but engine declares supports_multi_graph=false")]
131    MultiGraphUnsupported,
132}
133
134#[derive(Debug, thiserror::Error)]
135pub enum EngineError<E: std::error::Error + 'static> {
136    #[error("invalid serialized Plexus plan: {0}")]
137    Deserialize(#[from] plexus_serde::SerdeError),
138    #[error("invalid plan structure: {0}")]
139    InvalidStructure(plexus_serde::PlanValidationError),
140    #[error("engine execution failed: {0}")]
141    Engine(E),
142}
143
144/// Engine-facing API for consuming validated/deserialized Plexus plans.
145pub trait PlanEngine {
146    type Error: std::error::Error + Send + Sync + 'static;
147
148    /// Execute a deserialized plan.
149    fn execute_plan(&mut self, plan: &Plan) -> Result<QueryResult, Self::Error>;
150}
151
152/// Optional mutation capability for engines that execute DML ops.
153pub trait MutationEngine {
154    type Error: std::error::Error + Send + Sync + 'static;
155
156    fn create_node(
157        &mut self,
158        labels: &[String],
159        props: HashMap<String, Value>,
160    ) -> Result<u64, Self::Error>;
161    fn create_rel(
162        &mut self,
163        src: u64,
164        dst: u64,
165        rel_type: &str,
166        props: HashMap<String, Value>,
167    ) -> Result<u64, Self::Error>;
168    fn merge_pattern(
169        &mut self,
170        pattern: &Expr,
171        on_create_props: &Expr,
172        on_match_props: &Expr,
173        schema: &[plexus_serde::ColDef],
174        row: &Row,
175    ) -> Result<(), Self::Error>;
176    fn delete_entity(&mut self, target: &Value, detach: bool) -> Result<(), Self::Error>;
177    fn set_property(&mut self, target: &Value, key: &str, value: Value) -> Result<(), Self::Error>;
178    fn remove_property(&mut self, target: &Value, key: &str) -> Result<(), Self::Error>;
179}
180
181/// Deserialize, validate, and execute a serialized Plexus plan with the provided engine.
182///
183/// Validates plan structure (no cycles, valid references, valid root_op) before
184/// execution. Returns the first structural error encountered, if any.
185pub fn execute_serialized<E: PlanEngine>(
186    engine: &mut E,
187    bytes: &[u8],
188) -> Result<QueryResult, EngineError<E::Error>> {
189    let plan = deserialize_plan(bytes)?;
190    if let Err(errors) = validate_plan_structure(&plan) {
191        return Err(EngineError::InvalidStructure(
192            errors.into_iter().next().unwrap(),
193        ));
194    }
195    engine.execute_plan(&plan).map_err(EngineError::Engine)
196}
197
198/// Reference in-memory engine for integration testing and conformance validation.
199///
200/// This engine prioritizes correctness and readability over performance. It uses
201/// O(n) linear scans for entity lookups and clones row sets between pipeline
202/// stages. **Do not use as a template for production engine performance patterns.**
203///
204/// Production engines should implement [`PlanEngine`] (and optionally
205/// [`MutationEngine`]) against their own storage, or use the [`StorageAdapter`]
206/// trait for property-graph mapping. See `docs/engine-implementor-guide.md`.
207#[derive(Debug, Clone, Default)]
208pub struct InMemoryEngine {
209    pub graph: Graph,
210    pub params: HashMap<String, Value>,
211}
212
213impl InMemoryEngine {
214    pub fn new(graph: Graph) -> Self {
215        Self {
216            graph,
217            params: HashMap::new(),
218        }
219    }
220
221    pub fn with_params(graph: Graph, params: HashMap<String, Value>) -> Self {
222        Self { graph, params }
223    }
224
225    pub fn set_param(&mut self, name: impl Into<String>, value: Value) {
226        self.params.insert(name.into(), value);
227    }
228
229    pub fn clear_params(&mut self) {
230        self.params.clear();
231    }
232}
233
234#[derive(Debug, Clone, PartialEq)]
235pub struct VectorCollectionEntry {
236    pub node_id: u64,
237    pub embedding: Vec<f64>,
238}
239
240#[derive(Debug, Clone, Default)]
241pub struct MockVectorEngine {
242    pub base: InMemoryEngine,
243    pub collections: HashMap<String, Vec<VectorCollectionEntry>>,
244}
245
246impl MockVectorEngine {
247    pub fn new(graph: Graph) -> Self {
248        Self {
249            base: InMemoryEngine::new(graph),
250            collections: HashMap::new(),
251        }
252    }
253
254    pub fn with_collections(
255        graph: Graph,
256        collections: HashMap<String, Vec<VectorCollectionEntry>>,
257    ) -> Self {
258        Self {
259            base: InMemoryEngine::new(graph),
260            collections,
261        }
262    }
263
264    pub fn insert_collection(
265        &mut self,
266        name: impl Into<String>,
267        entries: Vec<VectorCollectionEntry>,
268    ) {
269        self.collections.insert(name.into(), entries);
270    }
271
272    pub fn set_param(&mut self, name: impl Into<String>, value: Value) {
273        self.base.set_param(name, value);
274    }
275
276    pub fn clear_params(&mut self) {
277        self.base.clear_params();
278    }
279}
280
281mod capabilities;
282mod storage;
283pub use storage::{EntityRef, StorageAdapter};
284mod engine;
285
286#[cfg(test)]
287mod tests;