Skip to main content

oxgraph_db/
query.rs

1//! Native `OxQL` and pinned Cypher-profile query execution.
2
3use oxgraph_graph::{
4    CanonicalElementIdentity, CanonicalRelationIdentity, EdgeSourceGraph, EdgeTargetGraph,
5    TopologyCounts,
6};
7use serde::{Deserialize, Serialize};
8
9use crate::{
10    DbError, ElementId, IncidenceRecord, ProjectionId, PropertyKeyId, PropertySubject,
11    PropertyValue, RelationId,
12    catalog::{ProjectionDefinition, PropertyFamily},
13    projection::GraphProjection,
14    state::DatabaseState,
15    traversal::{self, TraversalDirection, TraversalOptions},
16    value::parse_value_token,
17};
18
19/// Query frontend language.
20///
21/// # Performance
22///
23/// Copying and comparing are `O(1)`.
24#[derive(Clone, Copy, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
25pub enum QueryLanguage {
26    /// Native topology-first `OxQL` profile.
27    Oxql,
28    /// Pinned Cypher language profile over property-graph projections.
29    Cypher,
30}
31
32/// Prepared query plan.
33///
34/// # Performance
35///
36/// Cloning is `O(query length)`.
37#[derive(Clone, Debug, Eq, PartialEq)]
38pub struct PreparedQuery {
39    /// Source language.
40    language: QueryLanguage,
41    /// Original query text.
42    text: String,
43    /// Physical plan.
44    plan: QueryPlan,
45}
46
47impl PreparedQuery {
48    /// Parses and prepares one query string against current catalog metadata.
49    ///
50    /// # Errors
51    ///
52    /// Returns [`DbError::EmptyQuery`] or [`DbError::UnsupportedQuery`] when the
53    /// query is outside the supported profile.
54    ///
55    /// # Performance
56    ///
57    /// This function is `O(query length + catalog lookup cost)`.
58    pub(crate) fn prepare(
59        language: QueryLanguage,
60        query: &str,
61        state: &DatabaseState,
62    ) -> Result<Self, DbError> {
63        let trimmed = query.trim();
64        if trimmed.is_empty() {
65            return Err(DbError::EmptyQuery);
66        }
67        let plan = match language {
68            QueryLanguage::Oxql => prepare_oxql(trimmed, state)?,
69            QueryLanguage::Cypher => prepare_cypher(trimmed, state)?,
70        };
71        Ok(Self {
72            language,
73            text: trimmed.to_owned(),
74            plan,
75        })
76    }
77
78    /// Returns the query language.
79    ///
80    /// # Performance
81    ///
82    /// This method is `O(1)`.
83    #[must_use]
84    pub const fn language(&self) -> QueryLanguage {
85        self.language
86    }
87
88    /// Returns the source query text.
89    ///
90    /// # Performance
91    ///
92    /// This method is `O(1)`.
93    #[must_use]
94    pub fn text(&self) -> &str {
95        &self.text
96    }
97
98    /// Returns a stable physical explanation.
99    ///
100    /// # Performance
101    ///
102    /// This method is `O(plan size)`.
103    #[must_use]
104    pub fn explain(&self) -> String {
105        self.plan.explain()
106    }
107
108    /// Executes this prepared plan.
109    ///
110    /// # Errors
111    ///
112    /// Returns [`DbError`] when a referenced projection cannot be materialized.
113    ///
114    /// # Performance
115    ///
116    /// This method is `O(plan output + projection build cost when used)`.
117    pub(crate) fn execute(&self, state: &DatabaseState) -> Result<QueryResult, DbError> {
118        self.plan.execute(state)
119    }
120}
121
122/// Query result materialized for JSON, CLI, and embedded consumers.
123///
124/// # Performance
125///
126/// Iterating rows is `O(row count)`.
127#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
128pub struct QueryResult {
129    /// Materialized rows.
130    rows: Vec<QueryRow>,
131}
132
133impl QueryResult {
134    /// Creates a result from rows.
135    ///
136    /// # Performance
137    ///
138    /// This function is `O(1)` excluding row ownership transfer.
139    #[must_use]
140    pub(crate) const fn new(rows: Vec<QueryRow>) -> Self {
141        Self { rows }
142    }
143
144    /// Returns result rows.
145    ///
146    /// # Performance
147    ///
148    /// This method is `O(1)`.
149    #[must_use]
150    pub fn rows(&self) -> &[QueryRow] {
151        &self.rows
152    }
153}
154
155/// One query result row.
156///
157/// # Performance
158///
159/// Cloning is `O(value count + string/property bytes)`.
160#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
161pub struct QueryRow {
162    /// Values carried by the row.
163    pub values: Vec<QueryValue>,
164}
165
166impl QueryRow {
167    /// Creates a single-value row.
168    ///
169    /// # Performance
170    ///
171    /// This function is `O(1)` excluding value ownership transfer.
172    #[must_use]
173    pub(crate) fn single(value: QueryValue) -> Self {
174        Self {
175            values: vec![value],
176        }
177    }
178}
179
180/// Query value variants used by `OxQL` and the pinned Cypher profile.
181///
182/// # Performance
183///
184/// Cloning is `O(value bytes)`.
185#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
186pub enum QueryValue {
187    /// Canonical element id.
188    Element(ElementId),
189    /// Canonical relation id.
190    Relation(RelationId),
191    /// Canonical incidence record.
192    Incidence(IncidenceRecord),
193    /// Property subject.
194    Subject(PropertySubject),
195    /// Typed property value.
196    Property(PropertyValue),
197    /// Human-readable text value.
198    Text(String),
199    /// Projection id.
200    Projection(ProjectionId),
201}
202
203/// Private physical query plan.
204#[derive(Clone, Debug, Eq, PartialEq)]
205enum QueryPlan {
206    /// Scan all elements.
207    ElementScan,
208    /// Scan all relations.
209    RelationScan,
210    /// Scan all incidences.
211    IncidenceScan,
212    /// Scan elements with a label.
213    ElementLabelScan {
214        /// Label name for explanations.
215        label: String,
216        /// Label ID.
217        label_id: crate::LabelId,
218    },
219    /// Scan elements by property equality.
220    ElementPropertyEqual {
221        /// Property key name for explanations.
222        key: String,
223        /// Property key ID.
224        key_id: PropertyKeyId,
225        /// Required property value.
226        value: PropertyValue,
227    },
228    /// Scan relations by relation type.
229    RelationTypeScan {
230        /// Relation type name for explanations.
231        relation_type: String,
232        /// Relation type ID.
233        relation_type_id: crate::RelationTypeId,
234    },
235    /// Walk a graph projection.
236    GraphWalk {
237        /// Projection ID.
238        projection: ProjectionId,
239        /// Canonical starting element.
240        element: ElementId,
241        /// Traversal options.
242        options: TraversalOptions,
243    },
244    /// Cypher property-graph node scan.
245    CypherNodeScan,
246    /// Cypher property-graph node label scan.
247    CypherNodeLabelScan {
248        /// Label name.
249        label: String,
250        /// Label ID.
251        label_id: crate::LabelId,
252    },
253    /// Cypher directed edge triple scan.
254    CypherDirectedTriples {
255        /// Graph projection ID.
256        projection: ProjectionId,
257    },
258    /// Catalog metadata scan.
259    CatalogScan,
260}
261
262impl QueryPlan {
263    /// Explains this physical plan.
264    fn explain(&self) -> String {
265        match self {
266            Self::ElementScan => "oxql scan elements".to_owned(),
267            Self::RelationScan => "oxql scan relations".to_owned(),
268            Self::IncidenceScan => "oxql scan incidences".to_owned(),
269            Self::ElementLabelScan { label, .. } => {
270                format!("oxql label index lookup elements label={label}")
271            }
272            Self::ElementPropertyEqual { key, value, .. } => {
273                format!("oxql property equality lookup elements {key}={value}")
274            }
275            Self::RelationTypeScan { relation_type, .. } => {
276                format!("oxql relation-type index lookup type={relation_type}")
277            }
278            Self::GraphWalk {
279                projection,
280                element,
281                options,
282            } => format!(
283                "oxql graph projection {projection} walk from {element} depth {} direction {:?} limit {}",
284                options.max_depth, options.direction, options.limit,
285            ),
286            Self::CypherNodeScan => "cypher node scan over property-graph projection".to_owned(),
287            Self::CypherNodeLabelScan { label, .. } => {
288                format!("cypher node label scan label={label}")
289            }
290            Self::CypherDirectedTriples { projection } => {
291                format!("cypher directed triple scan projection={projection}")
292            }
293            Self::CatalogScan => "catalog metadata scan".to_owned(),
294        }
295    }
296
297    /// Executes this physical plan.
298    fn execute(&self, state: &DatabaseState) -> Result<QueryResult, DbError> {
299        match self {
300            Self::ElementScan | Self::CypherNodeScan => Ok(scan_elements(state)),
301            Self::RelationScan => Ok(scan_relations(state)),
302            Self::IncidenceScan => Ok(scan_incidences(state)),
303            Self::ElementLabelScan { label_id, .. }
304            | Self::CypherNodeLabelScan { label_id, .. } => {
305                Ok(scan_elements_with_label(state, *label_id))
306            }
307            Self::ElementPropertyEqual { key_id, value, .. } => {
308                scan_elements_with_property(state, *key_id, value)
309            }
310            Self::RelationTypeScan {
311                relation_type_id, ..
312            } => Ok(scan_relations_with_type(state, *relation_type_id)),
313            Self::GraphWalk {
314                projection,
315                element,
316                options,
317            } => execute_graph_walk(state, *projection, *element, *options),
318            Self::CypherDirectedTriples { projection } => {
319                execute_cypher_triples(state, *projection)
320            }
321            Self::CatalogScan => Ok(scan_catalog(state)),
322        }
323    }
324}
325
326/// Prepares native `OxQL`.
327fn prepare_oxql(query: &str, state: &DatabaseState) -> Result<QueryPlan, DbError> {
328    let tokens = query.split_whitespace().collect::<Vec<_>>();
329    let upper = tokens
330        .iter()
331        .map(|token| token.to_ascii_uppercase())
332        .collect::<Vec<_>>();
333    match upper.as_slice() {
334        [command] if command == "CATALOG" => Ok(QueryPlan::CatalogScan),
335        [verb, family] if verb == "MATCH" && family == "ELEMENTS" => Ok(QueryPlan::ElementScan),
336        [verb, family] if verb == "MATCH" && family == "RELATIONS" => Ok(QueryPlan::RelationScan),
337        [verb, family] if verb == "MATCH" && family == "INCIDENCES" => Ok(QueryPlan::IncidenceScan),
338        [verb, family, has, object, _name]
339            if verb == "MATCH" && family == "ELEMENTS" && has == "HAS" && object == "LABEL" =>
340        {
341            prepare_element_label(tokens[4], state)
342        }
343        [verb, family, r#type, _name]
344            if verb == "MATCH" && family == "RELATIONS" && r#type == "TYPE" =>
345        {
346            prepare_relation_type(tokens[3], state)
347        }
348        [verb, family, r#where, _key, equals, _value]
349            if verb == "MATCH" && family == "ELEMENTS" && r#where == "WHERE" && equals == "=" =>
350        {
351            prepare_element_property(tokens[3], tokens[5], state)
352        }
353        [graph, _projection, neighbors, _element]
354            if graph == "GRAPH" && neighbors == "NEIGHBORS" =>
355        {
356            prepare_graph_walk(tokens[1], tokens[3], TraversalOptions::default(), state)
357        }
358        [
359            graph,
360            _projection,
361            walk,
362            from,
363            _element,
364            depth,
365            _max_depth,
366            ..,
367        ] if graph == "GRAPH" && walk == "WALK" && from == "FROM" && depth == "DEPTH" => {
368            prepare_graph_walk_query(&tokens, &upper, state)
369        }
370        _tokens => Err(DbError::unsupported("unsupported OxQL profile query")),
371    }
372}
373
374/// Prepares the pinned Cypher language profile.
375fn prepare_cypher(query: &str, state: &DatabaseState) -> Result<QueryPlan, DbError> {
376    if query == "MATCH (n) RETURN n" {
377        return Ok(QueryPlan::CypherNodeScan);
378    }
379    if query == "MATCH (n)-[r]->(m) RETURN n,r,m" {
380        return Ok(QueryPlan::CypherDirectedTriples {
381            projection: first_graph_projection(state)?,
382        });
383    }
384    if let Some(label) = query
385        .strip_prefix("MATCH (n:")
386        .and_then(|rest| rest.strip_suffix(") RETURN n"))
387    {
388        let label_id = state
389            .catalog()
390            .label_id(label)
391            .ok_or_else(|| DbError::unsupported(format!("unknown Cypher label {label}")))?;
392        return Ok(QueryPlan::CypherNodeLabelScan {
393            label: label.to_owned(),
394            label_id,
395        });
396    }
397    Err(DbError::unsupported("unsupported Cypher profile query"))
398}
399
400/// Prepares an `OxQL` element label scan.
401fn prepare_element_label(label: &str, state: &DatabaseState) -> Result<QueryPlan, DbError> {
402    let label_id = state
403        .catalog()
404        .label_id(label)
405        .ok_or_else(|| DbError::unsupported(format!("unknown label {label}")))?;
406    Ok(QueryPlan::ElementLabelScan {
407        label: label.to_owned(),
408        label_id,
409    })
410}
411
412/// Prepares an `OxQL` relation type scan.
413fn prepare_relation_type(relation_type: &str, state: &DatabaseState) -> Result<QueryPlan, DbError> {
414    let relation_type_id = state
415        .catalog()
416        .relation_type_id(relation_type)
417        .ok_or_else(|| DbError::unsupported(format!("unknown relation type {relation_type}")))?;
418    Ok(QueryPlan::RelationTypeScan {
419        relation_type: relation_type.to_owned(),
420        relation_type_id,
421    })
422}
423
424/// Prepares an `OxQL` property equality scan.
425fn prepare_element_property(
426    key: &str,
427    value: &str,
428    state: &DatabaseState,
429) -> Result<QueryPlan, DbError> {
430    let key_id = state
431        .catalog()
432        .property_key_id(key)
433        .ok_or_else(|| DbError::unsupported(format!("unknown property key {key}")))?;
434    let value = parse_value_token(value).map_err(DbError::unsupported)?;
435    state.validate_lookup_value_for_family(key_id, PropertyFamily::Element, &value)?;
436    Ok(QueryPlan::ElementPropertyEqual {
437        key: key.to_owned(),
438        key_id,
439        value,
440    })
441}
442
443/// Prepares an `OxQL` graph walk query.
444fn prepare_graph_walk_query(
445    tokens: &[&str],
446    upper: &[String],
447    state: &DatabaseState,
448) -> Result<QueryPlan, DbError> {
449    let max_depth = tokens[6]
450        .parse::<usize>()
451        .map_err(|_error| DbError::unsupported("walk depth must be an integer"))?;
452    let mut options = TraversalOptions {
453        max_depth,
454        ..TraversalOptions::default()
455    };
456    let mut saw_direction = false;
457    let mut saw_limit = false;
458    let mut index = 7;
459    while index < tokens.len() {
460        let Some(value) = tokens.get(index + 1) else {
461            return Err(DbError::unsupported("walk option requires a value"));
462        };
463        match upper[index].as_str() {
464            "DIRECTION" if !saw_direction => {
465                options.direction = parse_walk_direction(&upper[index + 1])?;
466                saw_direction = true;
467            }
468            "DIRECTION" => return Err(DbError::unsupported("walk direction specified twice")),
469            "LIMIT" if !saw_limit => {
470                options.limit = value
471                    .parse::<usize>()
472                    .map_err(|_error| DbError::unsupported("walk limit must be an integer"))?;
473                saw_limit = true;
474            }
475            "LIMIT" => return Err(DbError::unsupported("walk limit specified twice")),
476            _option => return Err(DbError::unsupported("unsupported walk option")),
477        }
478        index += 2;
479    }
480    prepare_graph_walk(tokens[1], tokens[4], options, state)
481}
482
483/// Parses one `OxQL` graph walk direction.
484fn parse_walk_direction(direction: &str) -> Result<TraversalDirection, DbError> {
485    match direction {
486        "OUTGOING" => Ok(TraversalDirection::Outgoing),
487        "INCOMING" => Ok(TraversalDirection::Incoming),
488        "BOTH" => Ok(TraversalDirection::Both),
489        _direction => Err(DbError::unsupported(
490            "walk direction must be outgoing, incoming, or both",
491        )),
492    }
493}
494
495/// Prepares an `OxQL` graph traversal.
496fn prepare_graph_walk(
497    projection: &str,
498    element: &str,
499    options: TraversalOptions,
500    state: &DatabaseState,
501) -> Result<QueryPlan, DbError> {
502    let projection = state
503        .catalog()
504        .projection_id(projection)
505        .ok_or_else(|| DbError::unsupported(format!("unknown projection {projection}")))?;
506    let entry = state
507        .catalog()
508        .projection(projection)
509        .ok_or(DbError::UnknownProjection { id: projection })?;
510    if matches!(&entry.definition, ProjectionDefinition::Hypergraph(_)) {
511        return Err(DbError::invalid_projection("projection is not a graph"));
512    }
513    let raw = element
514        .parse::<u64>()
515        .map_err(|_error| DbError::unsupported("element id must be an integer"))?;
516    Ok(QueryPlan::GraphWalk {
517        projection,
518        element: ElementId::new(raw),
519        options,
520    })
521}
522
523/// Scans all elements.
524fn scan_elements(state: &DatabaseState) -> QueryResult {
525    QueryResult::new(
526        state
527            .elements()
528            .map(|record| QueryRow::single(QueryValue::Element(record.id)))
529            .collect(),
530    )
531}
532
533/// Scans all relations.
534fn scan_relations(state: &DatabaseState) -> QueryResult {
535    QueryResult::new(
536        state
537            .relations()
538            .map(|record| QueryRow::single(QueryValue::Relation(record.id)))
539            .collect(),
540    )
541}
542
543/// Scans all incidences.
544fn scan_incidences(state: &DatabaseState) -> QueryResult {
545    QueryResult::new(
546        state
547            .incidences()
548            .map(|record| QueryRow::single(QueryValue::Incidence(*record)))
549            .collect(),
550    )
551}
552
553/// Scans elements with one label.
554fn scan_elements_with_label(state: &DatabaseState, label_id: crate::LabelId) -> QueryResult {
555    QueryResult::new(
556        state
557            .elements_with_label(label_id)
558            .into_iter()
559            .map(|id| QueryRow::single(QueryValue::Element(id)))
560            .collect(),
561    )
562}
563
564/// Scans elements with a property value.
565fn scan_elements_with_property(
566    state: &DatabaseState,
567    key: PropertyKeyId,
568    value: &PropertyValue,
569) -> Result<QueryResult, DbError> {
570    Ok(QueryResult::new(
571        state
572            .typed_property_equal_for_family(key, PropertyFamily::Element, value)?
573            .into_iter()
574            .filter_map(|subject| match subject {
575                PropertySubject::Element(id) => Some(QueryRow::single(QueryValue::Element(id))),
576                PropertySubject::Relation(_) | PropertySubject::Incidence(_) => None,
577            })
578            .collect(),
579    ))
580}
581
582/// Scans relations with one relation type.
583fn scan_relations_with_type(
584    state: &DatabaseState,
585    relation_type: crate::RelationTypeId,
586) -> QueryResult {
587    QueryResult::new(
588        state
589            .relations_with_type(relation_type)
590            .into_iter()
591            .map(|id| QueryRow::single(QueryValue::Relation(id)))
592            .collect(),
593    )
594}
595
596/// Scans catalog entries.
597fn scan_catalog(state: &DatabaseState) -> QueryResult {
598    let rows = state
599        .catalog()
600        .projections()
601        .map(|entry| QueryRow {
602            values: vec![
603                QueryValue::Projection(entry.id),
604                QueryValue::Text(entry.definition.name().to_owned()),
605            ],
606        })
607        .collect();
608    QueryResult::new(rows)
609}
610
611/// Executes a graph walk.
612fn execute_graph_walk(
613    state: &DatabaseState,
614    projection: ProjectionId,
615    element: ElementId,
616    options: TraversalOptions,
617) -> Result<QueryResult, DbError> {
618    let graph = graph_projection(state, projection)?;
619    let traversal = traversal::traverse_graph_projection(&graph, &[element], options)?;
620    let rows = traversal
621        .rows()
622        .iter()
623        .map(|row| QueryRow::single(QueryValue::Element(row.element)))
624        .collect();
625    Ok(QueryResult::new(rows))
626}
627
628/// Executes the Cypher directed triple profile.
629fn execute_cypher_triples(
630    state: &DatabaseState,
631    projection: ProjectionId,
632) -> Result<QueryResult, DbError> {
633    let graph = graph_projection(state, projection)?;
634    let mut rows = Vec::with_capacity(graph.relation_count());
635    for index in 0..graph.relation_count() {
636        let edge = projection_relation(index)?;
637        rows.push(QueryRow {
638            values: vec![
639                QueryValue::Element(graph.canonical_element_id(graph.source(edge))),
640                QueryValue::Relation(graph.canonical_relation_id(edge)),
641                QueryValue::Element(graph.canonical_element_id(graph.target(edge))),
642            ],
643        });
644    }
645    Ok(QueryResult::new(rows))
646}
647
648/// Materializes a graph projection by ID.
649fn graph_projection(
650    state: &DatabaseState,
651    projection: ProjectionId,
652) -> Result<GraphProjection, DbError> {
653    let entry = state
654        .catalog()
655        .projection(projection)
656        .ok_or(DbError::UnknownProjection { id: projection })?;
657    match &entry.definition {
658        ProjectionDefinition::Graph(definition) => {
659            GraphProjection::from_state(state, definition.clone())
660        }
661        ProjectionDefinition::Hypergraph(_definition) => {
662            Err(DbError::invalid_projection("projection is not a graph"))
663        }
664    }
665}
666
667/// Finds the first graph projection in catalog order.
668fn first_graph_projection(state: &DatabaseState) -> Result<ProjectionId, DbError> {
669    state
670        .catalog()
671        .projections()
672        .find_map(|entry| match &entry.definition {
673            ProjectionDefinition::Graph(_definition) => Some(entry.id),
674            ProjectionDefinition::Hypergraph(_definition) => None,
675        })
676        .ok_or_else(|| DbError::unsupported("Cypher profile requires a graph projection"))
677}
678
679/// Builds a projection relation ID for query execution.
680fn projection_relation(index: usize) -> Result<crate::ProjectionRelationId, DbError> {
681    u32::try_from(index)
682        .map(crate::ProjectionRelationId::new)
683        .map_err(|_error| DbError::IdOverflow)
684}