Skip to main content

velesdb_core/database/
query_engine.rs

1//! Query execution: `execute_query`, `explain_query`, plan caching, and DML dispatch.
2
3use crate::{Error, Result, SearchResult};
4
5use super::Database;
6
7#[allow(deprecated)] // Uses legacy Collection internally for query routing.
8impl Database {
9    /// Produces a canonical JSON string for a `serde_json::Value`.
10    ///
11    /// Recursively sorts the keys of every JSON object so that two values
12    /// representing the same logical structure always produce identical bytes,
13    /// regardless of the `HashMap` iteration order used during serialization.
14    ///
15    /// This is required because `FusionConfig::params` and
16    /// `TrainStatement::params` are `HashMap`-backed; `serde_json` serialises
17    /// them in hash-order, which is non-deterministic across invocations.
18    fn canonical_json(value: serde_json::Value) -> serde_json::Value {
19        match value {
20            serde_json::Value::Object(map) => {
21                // Without the `preserve_order` feature flag, `serde_json::Map` is already
22                // backed by `BTreeMap` and therefore already sorted. This explicit sort
23                // step is kept as defense-in-depth: if `preserve_order` is ever enabled
24                // in `Cargo.toml` (which switches the backing store to `IndexMap` and
25                // preserves insertion order), the canonical key ordering is still upheld
26                // without any change to this function.
27                let sorted: serde_json::Map<String, serde_json::Value> = map
28                    .into_iter()
29                    .map(|(k, v)| (k, Self::canonical_json(v)))
30                    .collect::<std::collections::BTreeMap<_, _>>()
31                    .into_iter()
32                    .collect();
33                serde_json::Value::Object(sorted)
34            }
35            serde_json::Value::Array(arr) => {
36                serde_json::Value::Array(arr.into_iter().map(Self::canonical_json).collect())
37            }
38            other => other,
39        }
40    }
41
42    /// Builds a deterministic cache key for a query (CACHE-02).
43    ///
44    /// Serialises the query to canonical JSON (object keys sorted recursively),
45    /// reads the current `schema_version`, and gathers per-collection
46    /// `write_generation` counters (sorted by collection name) to form a
47    /// `PlanKey`.
48    ///
49    /// # Why canonical JSON instead of `Debug`
50    ///
51    /// `format!("{query:?}")` is non-deterministic when the `Query` AST
52    /// contains `HashMap`-backed fields (`FusionConfig::params`,
53    /// `TrainStatement::params`) because `HashMap` iteration order is not
54    /// guaranteed across invocations. Canonical JSON with sorted object keys
55    /// is stable and produces the same byte sequence for logically identical
56    /// queries.
57    #[must_use]
58    pub fn build_plan_key(&self, query: &crate::velesql::Query) -> crate::cache::PlanKey {
59        use std::hash::{BuildHasher, Hasher};
60
61        // Serialise via serde_json, then canonicalise (sort object keys) before hashing.
62        // Fallback to Debug representation if serialization fails (should never happen in
63        // practice since all Query fields are Serialize, but erring on the side of liveness).
64        let query_text = serde_json::to_value(query)
65            .map(Self::canonical_json)
66            .and_then(|v| serde_json::to_string(&v))
67            .unwrap_or_else(|_| format!("{query:?}"));
68
69        let mut hasher = rustc_hash::FxBuildHasher.build_hasher();
70        hasher.write(query_text.as_bytes());
71        let query_hash = hasher.finish();
72
73        let schema_version = self.schema_version();
74        let collection_names = Self::referenced_collection_names(query);
75
76        // Build generations vector in sorted collection order.
77        let collection_generations: smallvec::SmallVec<[u64; 4]> = collection_names
78            .iter()
79            .map(|name| self.collection_write_generation(name).unwrap_or(0))
80            .collect();
81
82        crate::cache::PlanKey {
83            query_hash,
84            schema_version,
85            collection_generations,
86        }
87    }
88
89    /// Returns the query plan for a query, with cache status populated (CACHE-02).
90    ///
91    /// If the plan is cached, returns it with `cache_hit: Some(true)` and
92    /// `plan_reuse_count` set. Otherwise generates a fresh plan with
93    /// `cache_hit: Some(false)`.
94    ///
95    /// # Design decision: `explain_query` does not populate the cache
96    ///
97    /// `explain_query` intentionally does **not** insert a new plan into the
98    /// compiled plan cache. EXPLAIN is a diagnostic operation; allowing it to
99    /// influence cache state would make cache metrics (hit/miss ratios,
100    /// `plan_reuse_count`) unreliable because EXPLAIN calls would be
101    /// indistinguishable from real execution hits. Only `execute_query` is
102    /// authorised to write to the cache.
103    ///
104    /// # Errors
105    ///
106    /// Returns an error if the query is invalid.
107    pub fn explain_query(
108        &self,
109        query: &crate::velesql::Query,
110    ) -> Result<crate::velesql::QueryPlan> {
111        crate::velesql::QueryValidator::validate(query).map_err(|e| Error::Query(e.to_string()))?;
112
113        let plan_key = self.build_plan_key(query);
114
115        if let Some(cached) = self.compiled_plan_cache.get(&plan_key) {
116            let mut plan = cached.plan.clone();
117            plan.cache_hit = Some(true);
118            plan.plan_reuse_count = Some(
119                cached
120                    .reuse_count
121                    .load(std::sync::atomic::Ordering::Relaxed),
122            );
123            return Ok(plan);
124        }
125
126        let mut plan = crate::velesql::QueryPlan::from_select(&query.select);
127        plan.cache_hit = Some(false);
128        plan.plan_reuse_count = Some(0);
129        Ok(plan)
130    }
131
132    /// Executes a `VelesQL` query with database-level JOIN resolution.
133    ///
134    /// This method resolves JOIN target collections from the database registry
135    /// and executes JOIN runtime in sequence. Query plans are cached and
136    /// reused for identical queries against unchanged collections (CACHE-02).
137    ///
138    /// # Errors
139    ///
140    /// Returns an error if the base collection or any JOIN collection is missing.
141    #[allow(clippy::too_many_lines)]
142    pub fn execute_query(
143        &self,
144        query: &crate::velesql::Query,
145        params: &std::collections::HashMap<String, serde_json::Value>,
146    ) -> Result<Vec<SearchResult>> {
147        crate::velesql::QueryValidator::validate(query).map_err(|e| Error::Query(e.to_string()))?;
148
149        if let Some(train) = query.train.as_ref() {
150            return self.execute_train(train);
151        }
152
153        if let Some(dml) = query.dml.as_ref() {
154            return self.execute_dml(dml, params);
155        }
156
157        if query.is_match_query() {
158            return Err(Error::Query(
159                "Database::execute_query does not support top-level MATCH queries. Use Collection::execute_query or pass the collection name."
160                    .to_string(),
161            ));
162        }
163
164        // Build plan key and check cache WITHOUT recording hit/miss metrics (CACHE-02).
165        //
166        // `contains()` is used instead of `get().is_some()` so that this
167        // existence check does not increment the hit/miss counters or
168        // `reuse_count`. Only `explain_query` (which surfaces these values to
169        // callers) should call `get()`.
170        let pre_exec_key = self.build_plan_key(query);
171        let is_cached = self.compiled_plan_cache.contains(&pre_exec_key);
172
173        let results = self.execute_select_query(query, params)?;
174
175        // Populate cache on miss (CACHE-02).
176        //
177        // C-1 TOCTOU fix: rebuild the plan key AFTER execution. Between the
178        // pre-execution `contains()` check and here, a concurrent writer may
179        // have bumped a collection's `write_generation` (e.g. via `upsert` on
180        // another thread). Rebuilding the key captures the post-execution
181        // state, so the cached plan is associated with the generation that was
182        // live when the plan was actually compiled — not a potentially stale
183        // pre-execution snapshot.
184        if !is_cached {
185            self.populate_plan_cache(query);
186        }
187
188        Ok(results)
189    }
190
191    /// Executes the SELECT portion of a query, resolving JOINs if present.
192    fn execute_select_query(
193        &self,
194        query: &crate::velesql::Query,
195        params: &std::collections::HashMap<String, serde_json::Value>,
196    ) -> Result<Vec<SearchResult>> {
197        // EPIC-040 US-006: For compound queries, strip LIMIT from each operand so
198        // the set operation sees the full result sets.  The final LIMIT is applied
199        // once on the merged output (SQL-standard behaviour).
200        // Use MAX_LIMIT (not None) to avoid the default-10 cap downstream.
201        let compound_limit = Some(100_000_u64);
202        let left_results = if query.compound.is_some() {
203            let mut left_query = query.clone();
204            left_query.select.limit = compound_limit;
205            self.execute_single_select(&left_query, params)?
206        } else {
207            return self.execute_single_select(query, params);
208        };
209
210        // compound is guaranteed Some here (non-compound returns above).
211        if let Some(ref compound) = query.compound {
212            let mut accumulated = left_results;
213            for (operator, right_select) in &compound.operations {
214                let mut right_query = crate::velesql::Query::new_select(right_select.clone());
215                right_query.select.limit = compound_limit;
216                let right_results = self.execute_single_select(&right_query, params)?;
217                accumulated = crate::collection::search::query::set_operations::apply_set_operation(
218                    accumulated,
219                    right_results,
220                    *operator,
221                );
222            }
223            // SQL-standard: LIMIT from the left (outer) SELECT applies to the final result.
224            if let Some(limit) = query.select.limit {
225                accumulated.truncate(usize::try_from(limit).unwrap_or(usize::MAX));
226            }
227            return Ok(accumulated);
228        }
229
230        Ok(left_results)
231    }
232
233    /// Collects sorted, deduplicated collection names referenced by a query,
234    /// including all compound operands (UNION, INTERSECT, EXCEPT).
235    ///
236    /// RF-DEDUP: Shared by `build_plan_key` and `populate_plan_cache`, which
237    /// both need the same sorted collection-name list from the query AST.
238    fn referenced_collection_names(query: &crate::velesql::Query) -> Vec<String> {
239        let mut names = vec![query.select.from.clone()];
240        for join in &query.select.joins {
241            names.push(join.table.clone());
242        }
243        if let Some(ref compound) = query.compound {
244            for (_, right_select) in &compound.operations {
245                names.push(right_select.from.clone());
246                for join in &right_select.joins {
247                    names.push(join.table.clone());
248                }
249            }
250        }
251        names.sort();
252        names.dedup();
253        names
254    }
255
256    /// Resolves a collection by name from all registries (legacy, vector, metadata).
257    ///
258    /// Priority: legacy collections registry first (contains live instances for both
259    /// `create_collection` and `create_vector_collection` via shared inner `Arc<>`).
260    /// Falls back to vector collections, then metadata collections.
261    ///
262    /// RF-DEDUP: Shared by `execute_single_select` (reads are valid on all collection
263    /// types, including metadata).
264    #[allow(deprecated)]
265    pub(super) fn resolve_collection(&self, name: &str) -> Result<crate::collection::Collection> {
266        self.get_collection(name)
267            .or_else(|| self.get_vector_collection(name).map(|vc| vc.inner))
268            .or_else(|| self.get_metadata_collection(name).map(|mc| mc.inner))
269            .ok_or_else(|| Error::CollectionNotFound(name.to_string()))
270    }
271
272    /// Resolves a collection that supports write operations (INSERT/UPDATE/TRAIN).
273    ///
274    /// Only checks legacy and vector collections — metadata-only collections do not
275    /// support INSERT with vectors, UPDATE with vectors, or TRAIN QUANTIZER, so
276    /// resolving them here would produce misleading errors deeper in the pipeline.
277    #[allow(deprecated)]
278    pub(super) fn resolve_writable_collection(
279        &self,
280        name: &str,
281    ) -> Result<crate::collection::Collection> {
282        self.get_collection(name)
283            .or_else(|| self.get_vector_collection(name).map(|vc| vc.inner))
284            .ok_or_else(|| Error::CollectionNotFound(name.to_string()))
285    }
286
287    /// Executes a single SELECT (no compound), resolving JOINs if present.
288    fn execute_single_select(
289        &self,
290        query: &crate::velesql::Query,
291        params: &std::collections::HashMap<String, serde_json::Value>,
292    ) -> Result<Vec<SearchResult>> {
293        let base_collection = self.resolve_collection(&query.select.from)?;
294
295        // Strip compound from the query before delegating to Collection::execute_query,
296        // because compound handling is done by execute_select_query (our caller).
297        // Without this, the set operation would be applied twice (once at Collection
298        // level, once here) — causing e.g. UNION ALL to duplicate right-side results.
299        let mut single_query = query.clone();
300        single_query.compound = None;
301
302        if single_query.select.joins.is_empty() {
303            return base_collection.execute_query(&single_query, params);
304        }
305
306        single_query.select.joins.clear();
307
308        let mut results = base_collection.execute_query(&single_query, params)?;
309        for join in &query.select.joins {
310            let join_collection = self.resolve_collection(&join.table)?;
311            let column_store = Self::build_join_column_store(&join_collection)?;
312            let joined = crate::collection::search::query::join::execute_join(
313                &results,
314                join,
315                &column_store,
316            )?;
317            results = crate::collection::search::query::join::joined_to_search_results(joined);
318        }
319        Ok(results)
320    }
321
322    /// Inserts a compiled plan into the cache after a cache miss (CACHE-02).
323    fn populate_plan_cache(&self, query: &crate::velesql::Query) {
324        let compiled = std::sync::Arc::new(crate::cache::CompiledPlan {
325            plan: crate::velesql::QueryPlan::from_select(&query.select),
326            referenced_collections: Self::referenced_collection_names(query),
327            compiled_at: std::time::Instant::now(),
328            reuse_count: std::sync::atomic::AtomicU64::new(0),
329        });
330        // Rebuild key after execution to reflect current write_generation (C-1).
331        let post_exec_key = self.build_plan_key(query);
332        self.compiled_plan_cache.insert(post_exec_key, compiled);
333    }
334
335    /// Dispatches a DML statement (INSERT or UPDATE).
336    pub(super) fn execute_dml(
337        &self,
338        dml: &crate::velesql::DmlStatement,
339        params: &std::collections::HashMap<String, serde_json::Value>,
340    ) -> Result<Vec<SearchResult>> {
341        match dml {
342            crate::velesql::DmlStatement::Insert(stmt) => self.execute_insert(stmt, params),
343            crate::velesql::DmlStatement::Update(stmt) => self.execute_update(stmt, params),
344        }
345    }
346
347    /// Executes an INSERT statement.
348    #[allow(deprecated)]
349    fn execute_insert(
350        &self,
351        stmt: &crate::velesql::InsertStatement,
352        params: &std::collections::HashMap<String, serde_json::Value>,
353    ) -> Result<Vec<SearchResult>> {
354        let collection = self.resolve_writable_collection(&stmt.table)?;
355
356        let (id, vector, payload) = Self::resolve_insert_fields(stmt, params)?;
357        let point_id =
358            id.ok_or_else(|| Error::Query("INSERT requires integer 'id' column".to_string()))?;
359        let point = Self::build_insert_point(&collection, point_id, vector, payload)?;
360
361        let result = SearchResult::new(point.clone(), 0.0);
362        collection.upsert(vec![point])?;
363        Ok(vec![result])
364    }
365
366    /// Resolves column values from an INSERT statement into id, vector, and payload fields.
367    #[allow(clippy::type_complexity)] // Reason: one-off tuple return for internal helper.
368    fn resolve_insert_fields(
369        stmt: &crate::velesql::InsertStatement,
370        params: &std::collections::HashMap<String, serde_json::Value>,
371    ) -> Result<(
372        Option<u64>,
373        Option<Vec<f32>>,
374        serde_json::Map<String, serde_json::Value>,
375    )> {
376        let mut id: Option<u64> = None;
377        let mut payload = serde_json::Map::new();
378        let mut vector: Option<Vec<f32>> = None;
379
380        for (column, value_expr) in stmt.columns.iter().zip(&stmt.values) {
381            let resolved = Self::resolve_dml_value(value_expr, params)?;
382            if column == "id" {
383                id = Some(Self::json_to_u64_id(&resolved)?);
384                continue;
385            }
386            if column == "vector" {
387                vector = Some(Self::json_to_vector(&resolved)?);
388                continue;
389            }
390            payload.insert(column.clone(), resolved);
391        }
392
393        Ok((id, vector, payload))
394    }
395
396    /// Builds a `Point` for an INSERT statement, validating vector presence.
397    fn build_insert_point(
398        collection: &crate::Collection,
399        point_id: u64,
400        vector: Option<Vec<f32>>,
401        payload: serde_json::Map<String, serde_json::Value>,
402    ) -> Result<crate::Point> {
403        if collection.is_metadata_only() {
404            if vector.is_some() {
405                return Err(Error::Query(
406                    "INSERT on metadata-only collection cannot set 'vector'".to_string(),
407                ));
408            }
409            Ok(crate::Point::metadata_only(
410                point_id,
411                serde_json::Value::Object(payload),
412            ))
413        } else {
414            let vec_value = vector.ok_or_else(|| {
415                Error::Query("INSERT on vector collection requires 'vector' column".to_string())
416            })?;
417            Ok(crate::Point::new(
418                point_id,
419                vec_value,
420                Some(serde_json::Value::Object(payload)),
421            ))
422        }
423    }
424
425    /// Executes an UPDATE statement.
426    #[allow(deprecated)]
427    fn execute_update(
428        &self,
429        stmt: &crate::velesql::UpdateStatement,
430        params: &std::collections::HashMap<String, serde_json::Value>,
431    ) -> Result<Vec<SearchResult>> {
432        let collection = self.resolve_writable_collection(&stmt.table)?;
433
434        let assignments = Self::resolve_update_assignments(stmt, params)?;
435        let filter = Self::build_update_filter(stmt.where_clause.as_ref())?;
436
437        let all_ids = collection.all_ids();
438        let rows = collection.get(&all_ids);
439        let updated_points =
440            Self::apply_update_assignments(&collection, rows, filter.as_ref(), &assignments)?;
441
442        Self::upsert_and_collect(&collection, updated_points)
443    }
444
445    /// Resolves and validates UPDATE assignment values.
446    fn resolve_update_assignments(
447        stmt: &crate::velesql::UpdateStatement,
448        params: &std::collections::HashMap<String, serde_json::Value>,
449    ) -> Result<Vec<(String, serde_json::Value)>> {
450        let assignments = stmt
451            .assignments
452            .iter()
453            .map(|a| Ok((a.column.clone(), Self::resolve_dml_value(&a.value, params)?)))
454            .collect::<Result<Vec<_>>>()?;
455
456        if assignments.iter().any(|(name, _)| name == "id") {
457            return Err(Error::Query(
458                "UPDATE cannot modify primary key column 'id'".to_string(),
459            ));
460        }
461        Ok(assignments)
462    }
463
464    /// Upserts updated points and returns them as search results.
465    #[allow(deprecated)]
466    fn upsert_and_collect(
467        collection: &crate::Collection,
468        updated_points: Vec<crate::Point>,
469    ) -> Result<Vec<SearchResult>> {
470        if updated_points.is_empty() {
471            return Ok(Vec::new());
472        }
473        let results = updated_points
474            .iter()
475            .map(|p| SearchResult::new(p.clone(), 0.0))
476            .collect();
477        collection.upsert(updated_points)?;
478        Ok(results)
479    }
480
481    /// Applies field assignments to matching points, producing updated points.
482    fn apply_update_assignments(
483        collection: &crate::Collection,
484        rows: Vec<Option<crate::Point>>,
485        filter: Option<&crate::Filter>,
486        assignments: &[(String, serde_json::Value)],
487    ) -> Result<Vec<crate::Point>> {
488        let mut updated_points = Vec::new();
489        for point in rows.into_iter().flatten() {
490            if !Self::matches_update_filter(&point, filter) {
491                continue;
492            }
493
494            let mut payload_map = point
495                .payload
496                .as_ref()
497                .and_then(serde_json::Value::as_object)
498                .cloned()
499                .unwrap_or_default();
500
501            let mut updated_vector = point.vector.clone();
502
503            for (field, value) in assignments {
504                if field == "vector" {
505                    if collection.is_metadata_only() {
506                        return Err(Error::Query(
507                            "UPDATE on metadata-only collection cannot set 'vector'".to_string(),
508                        ));
509                    }
510                    updated_vector = Self::json_to_vector(value)?;
511                } else {
512                    payload_map.insert(field.clone(), value.clone());
513                }
514            }
515
516            let updated = if collection.is_metadata_only() {
517                crate::Point::metadata_only(point.id, serde_json::Value::Object(payload_map))
518            } else {
519                crate::Point::new(
520                    point.id,
521                    updated_vector,
522                    Some(serde_json::Value::Object(payload_map)),
523                )
524            };
525            updated_points.push(updated);
526        }
527        Ok(updated_points)
528    }
529}