Skip to main content

velesdb_core/database/
query_engine.rs

1//! Query execution: `execute_query`, `explain_query`, plan caching, and DML dispatch.
2
3use crate::{Error, Result, SearchResult};
4
5use super::Database;
6
7#[allow(deprecated)] // Uses legacy Collection internally for query routing.
8impl Database {
9    /// Produces a canonical JSON string for a `serde_json::Value`.
10    ///
11    /// Recursively sorts the keys of every JSON object so that two values
12    /// representing the same logical structure always produce identical bytes,
13    /// regardless of the `HashMap` iteration order used during serialization.
14    ///
15    /// This is required because `FusionConfig::params` and
16    /// `TrainStatement::params` are `HashMap`-backed; `serde_json` serialises
17    /// them in hash-order, which is non-deterministic across invocations.
18    fn canonical_json(value: serde_json::Value) -> serde_json::Value {
19        match value {
20            serde_json::Value::Object(map) => {
21                // Without the `preserve_order` feature flag, `serde_json::Map` is already
22                // backed by `BTreeMap` and therefore already sorted. This explicit sort
23                // step is kept as defense-in-depth: if `preserve_order` is ever enabled
24                // in `Cargo.toml` (which switches the backing store to `IndexMap` and
25                // preserves insertion order), the canonical key ordering is still upheld
26                // without any change to this function.
27                let sorted: serde_json::Map<String, serde_json::Value> = map
28                    .into_iter()
29                    .map(|(k, v)| (k, Self::canonical_json(v)))
30                    .collect::<std::collections::BTreeMap<_, _>>()
31                    .into_iter()
32                    .collect();
33                serde_json::Value::Object(sorted)
34            }
35            serde_json::Value::Array(arr) => {
36                serde_json::Value::Array(arr.into_iter().map(Self::canonical_json).collect())
37            }
38            other => other,
39        }
40    }
41
42    /// Builds a deterministic cache key for a query (CACHE-02).
43    ///
44    /// Serialises the query to canonical JSON (object keys sorted recursively),
45    /// reads the current `schema_version`, and gathers per-collection
46    /// `write_generation` counters (sorted by collection name) to form a
47    /// `PlanKey`.
48    ///
49    /// # Why canonical JSON instead of `Debug`
50    ///
51    /// `format!("{query:?}")` is non-deterministic when the `Query` AST
52    /// contains `HashMap`-backed fields (`FusionConfig::params`,
53    /// `TrainStatement::params`) because `HashMap` iteration order is not
54    /// guaranteed across invocations. Canonical JSON with sorted object keys
55    /// is stable and produces the same byte sequence for logically identical
56    /// queries.
57    #[must_use]
58    pub fn build_plan_key(&self, query: &crate::velesql::Query) -> crate::cache::PlanKey {
59        use std::hash::{BuildHasher, Hasher};
60
61        // Serialise via serde_json, then canonicalise (sort object keys) before hashing.
62        // Fallback to Debug representation if serialization fails (should never happen in
63        // practice since all Query fields are Serialize, but erring on the side of liveness).
64        let query_text = serde_json::to_value(query)
65            .map(Self::canonical_json)
66            .and_then(|v| serde_json::to_string(&v))
67            .unwrap_or_else(|_| format!("{query:?}"));
68
69        let mut hasher = rustc_hash::FxBuildHasher.build_hasher();
70        hasher.write(query_text.as_bytes());
71        let query_hash = hasher.finish();
72
73        let schema_version = self.schema_version();
74        let collection_names = Self::referenced_collection_names(query);
75
76        // Build generations vector in sorted collection order.
77        let collection_generations: smallvec::SmallVec<[u64; 4]> = collection_names
78            .iter()
79            .map(|name| self.collection_write_generation(name).unwrap_or(0))
80            .collect();
81
82        crate::cache::PlanKey {
83            query_hash,
84            schema_version,
85            collection_generations,
86        }
87    }
88
89    /// Returns the query plan for a query, with cache status populated (CACHE-02).
90    ///
91    /// If the plan is cached, returns it with `cache_hit: Some(true)` and
92    /// `plan_reuse_count` set. Otherwise generates a fresh plan with
93    /// `cache_hit: Some(false)`.
94    ///
95    /// # Design decision: `explain_query` does not populate the cache
96    ///
97    /// `explain_query` intentionally does **not** insert a new plan into the
98    /// compiled plan cache. EXPLAIN is a diagnostic operation; allowing it to
99    /// influence cache state would make cache metrics (hit/miss ratios,
100    /// `plan_reuse_count`) unreliable because EXPLAIN calls would be
101    /// indistinguishable from real execution hits. Only `execute_query` is
102    /// authorised to write to the cache.
103    ///
104    /// # Errors
105    ///
106    /// Returns an error if the query is invalid.
107    pub fn explain_query(
108        &self,
109        query: &crate::velesql::Query,
110    ) -> Result<crate::velesql::QueryPlan> {
111        crate::velesql::QueryValidator::validate(query).map_err(|e| Error::Query(e.to_string()))?;
112
113        let plan_key = self.build_plan_key(query);
114
115        if let Some(cached) = self.compiled_plan_cache.get(&plan_key) {
116            let mut plan = cached.plan.clone();
117            plan.cache_hit = Some(true);
118            plan.plan_reuse_count = Some(
119                cached
120                    .reuse_count
121                    .load(std::sync::atomic::Ordering::Relaxed),
122            );
123            return Ok(plan);
124        }
125
126        let mut plan = crate::velesql::QueryPlan::from_select(&query.select);
127        plan.cache_hit = Some(false);
128        plan.plan_reuse_count = Some(0);
129        Ok(plan)
130    }
131
132    /// Executes a `VelesQL` query with database-level JOIN resolution.
133    ///
134    /// This method resolves JOIN target collections from the database registry
135    /// and executes JOIN runtime in sequence. Query plans are cached and
136    /// reused for identical queries against unchanged collections (CACHE-02).
137    ///
138    /// # Errors
139    ///
140    /// Returns an error if the base collection or any JOIN collection is missing.
141    #[allow(clippy::too_many_lines)]
142    pub fn execute_query(
143        &self,
144        query: &crate::velesql::Query,
145        params: &std::collections::HashMap<String, serde_json::Value>,
146    ) -> Result<Vec<SearchResult>> {
147        crate::velesql::QueryValidator::validate(query).map_err(|e| Error::Query(e.to_string()))?;
148
149        if let Some(train) = query.train.as_ref() {
150            return self.execute_train(train);
151        }
152
153        if let Some(dml) = query.dml.as_ref() {
154            return self.execute_dml(dml, params);
155        }
156
157        if query.is_match_query() {
158            return Err(Error::Query(
159                "Database::execute_query does not support top-level MATCH queries. Use Collection::execute_query or pass the collection name."
160                    .to_string(),
161            ));
162        }
163
164        // Build plan key and check cache WITHOUT recording hit/miss metrics (CACHE-02).
165        //
166        // `contains()` is used instead of `get().is_some()` so that this
167        // existence check does not increment the hit/miss counters or
168        // `reuse_count`. Only `explain_query` (which surfaces these values to
169        // callers) should call `get()`.
170        let pre_exec_key = self.build_plan_key(query);
171        let is_cached = self.compiled_plan_cache.contains(&pre_exec_key);
172
173        let results = self.execute_select_query(query, params)?;
174
175        // Populate cache on miss (CACHE-02).
176        //
177        // C-1 TOCTOU fix: rebuild the plan key AFTER execution. Between the
178        // pre-execution `contains()` check and here, a concurrent writer may
179        // have bumped a collection's `write_generation` (e.g. via `upsert` on
180        // another thread). Rebuilding the key captures the post-execution
181        // state, so the cached plan is associated with the generation that was
182        // live when the plan was actually compiled — not a potentially stale
183        // pre-execution snapshot.
184        if !is_cached {
185            self.populate_plan_cache(query);
186        }
187
188        Ok(results)
189    }
190
191    /// Executes the SELECT portion of a query, resolving JOINs if present.
192    fn execute_select_query(
193        &self,
194        query: &crate::velesql::Query,
195        params: &std::collections::HashMap<String, serde_json::Value>,
196    ) -> Result<Vec<SearchResult>> {
197        // EPIC-040 US-006: For compound queries, strip LIMIT from each operand so
198        // the set operation sees the full result sets.  The final LIMIT is applied
199        // once on the merged output (SQL-standard behaviour).
200        // Use MAX_LIMIT (not None) to avoid the default-10 cap downstream.
201        let compound_limit = Some(100_000_u64);
202        let left_results = if query.compound.is_some() {
203            let mut left_query = query.clone();
204            left_query.select.limit = compound_limit;
205            self.execute_single_select(&left_query, params)?
206        } else {
207            return self.execute_single_select(query, params);
208        };
209
210        // compound is guaranteed Some here (non-compound returns above).
211        if let Some(ref compound) = query.compound {
212            let mut right_query = crate::velesql::Query::new_select(*compound.right.clone());
213            right_query.select.limit = compound_limit;
214            let right_results = self.execute_single_select(&right_query, params)?;
215            let mut merged = crate::collection::search::query::set_operations::apply_set_operation(
216                left_results,
217                right_results,
218                compound.operator,
219            );
220            // SQL-standard: LIMIT from the left (outer) SELECT applies to the final result.
221            if let Some(limit) = query.select.limit {
222                merged.truncate(usize::try_from(limit).unwrap_or(usize::MAX));
223            }
224            return Ok(merged);
225        }
226
227        Ok(left_results)
228    }
229
230    /// Collects sorted, deduplicated collection names referenced by a query.
231    ///
232    /// RF-DEDUP: Shared by `build_plan_key` and `populate_plan_cache`, which
233    /// both need the same sorted collection-name list from the query AST.
234    fn referenced_collection_names(query: &crate::velesql::Query) -> Vec<String> {
235        let mut names = vec![query.select.from.clone()];
236        for join in &query.select.joins {
237            names.push(join.table.clone());
238        }
239        names.sort();
240        names.dedup();
241        names
242    }
243
244    /// Resolves a collection by name from all registries (legacy, vector, metadata).
245    ///
246    /// Priority: legacy collections registry first (contains live instances for both
247    /// `create_collection` and `create_vector_collection` via shared inner `Arc<>`).
248    /// Falls back to vector collections, then metadata collections.
249    ///
250    /// RF-DEDUP: Shared by `execute_single_select` (reads are valid on all collection
251    /// types, including metadata).
252    #[allow(deprecated)]
253    pub(super) fn resolve_collection(&self, name: &str) -> Result<crate::collection::Collection> {
254        self.get_collection(name)
255            .or_else(|| self.get_vector_collection(name).map(|vc| vc.inner))
256            .or_else(|| self.get_metadata_collection(name).map(|mc| mc.inner))
257            .ok_or_else(|| Error::CollectionNotFound(name.to_string()))
258    }
259
260    /// Resolves a collection that supports write operations (INSERT/UPDATE/TRAIN).
261    ///
262    /// Only checks legacy and vector collections — metadata-only collections do not
263    /// support INSERT with vectors, UPDATE with vectors, or TRAIN QUANTIZER, so
264    /// resolving them here would produce misleading errors deeper in the pipeline.
265    #[allow(deprecated)]
266    pub(super) fn resolve_writable_collection(
267        &self,
268        name: &str,
269    ) -> Result<crate::collection::Collection> {
270        self.get_collection(name)
271            .or_else(|| self.get_vector_collection(name).map(|vc| vc.inner))
272            .ok_or_else(|| Error::CollectionNotFound(name.to_string()))
273    }
274
275    /// Executes a single SELECT (no compound), resolving JOINs if present.
276    fn execute_single_select(
277        &self,
278        query: &crate::velesql::Query,
279        params: &std::collections::HashMap<String, serde_json::Value>,
280    ) -> Result<Vec<SearchResult>> {
281        let base_collection = self.resolve_collection(&query.select.from)?;
282
283        // Strip compound from the query before delegating to Collection::execute_query,
284        // because compound handling is done by execute_select_query (our caller).
285        // Without this, the set operation would be applied twice (once at Collection
286        // level, once here) — causing e.g. UNION ALL to duplicate right-side results.
287        let mut single_query = query.clone();
288        single_query.compound = None;
289
290        if single_query.select.joins.is_empty() {
291            return base_collection.execute_query(&single_query, params);
292        }
293
294        single_query.select.joins.clear();
295
296        let mut results = base_collection.execute_query(&single_query, params)?;
297        for join in &query.select.joins {
298            let join_collection = self.resolve_collection(&join.table)?;
299            let column_store = Self::build_join_column_store(&join_collection)?;
300            let joined = crate::collection::search::query::join::execute_join(
301                &results,
302                join,
303                &column_store,
304            )?;
305            results = crate::collection::search::query::join::joined_to_search_results(joined);
306        }
307        Ok(results)
308    }
309
310    /// Inserts a compiled plan into the cache after a cache miss (CACHE-02).
311    fn populate_plan_cache(&self, query: &crate::velesql::Query) {
312        let compiled = std::sync::Arc::new(crate::cache::CompiledPlan {
313            plan: crate::velesql::QueryPlan::from_select(&query.select),
314            referenced_collections: Self::referenced_collection_names(query),
315            compiled_at: std::time::Instant::now(),
316            reuse_count: std::sync::atomic::AtomicU64::new(0),
317        });
318        // Rebuild key after execution to reflect current write_generation (C-1).
319        let post_exec_key = self.build_plan_key(query);
320        self.compiled_plan_cache.insert(post_exec_key, compiled);
321    }
322
323    /// Dispatches a DML statement (INSERT or UPDATE).
324    pub(super) fn execute_dml(
325        &self,
326        dml: &crate::velesql::DmlStatement,
327        params: &std::collections::HashMap<String, serde_json::Value>,
328    ) -> Result<Vec<SearchResult>> {
329        match dml {
330            crate::velesql::DmlStatement::Insert(stmt) => self.execute_insert(stmt, params),
331            crate::velesql::DmlStatement::Update(stmt) => self.execute_update(stmt, params),
332        }
333    }
334
335    /// Executes an INSERT statement.
336    #[allow(deprecated)]
337    fn execute_insert(
338        &self,
339        stmt: &crate::velesql::InsertStatement,
340        params: &std::collections::HashMap<String, serde_json::Value>,
341    ) -> Result<Vec<SearchResult>> {
342        let collection = self.resolve_writable_collection(&stmt.table)?;
343
344        let (id, vector, payload) = Self::resolve_insert_fields(stmt, params)?;
345        let point_id =
346            id.ok_or_else(|| Error::Query("INSERT requires integer 'id' column".to_string()))?;
347        let point = Self::build_insert_point(&collection, point_id, vector, payload)?;
348
349        let result = SearchResult::new(point.clone(), 0.0);
350        collection.upsert(vec![point])?;
351        Ok(vec![result])
352    }
353
354    /// Resolves column values from an INSERT statement into id, vector, and payload fields.
355    #[allow(clippy::type_complexity)] // Reason: one-off tuple return for internal helper.
356    fn resolve_insert_fields(
357        stmt: &crate::velesql::InsertStatement,
358        params: &std::collections::HashMap<String, serde_json::Value>,
359    ) -> Result<(
360        Option<u64>,
361        Option<Vec<f32>>,
362        serde_json::Map<String, serde_json::Value>,
363    )> {
364        let mut id: Option<u64> = None;
365        let mut payload = serde_json::Map::new();
366        let mut vector: Option<Vec<f32>> = None;
367
368        for (column, value_expr) in stmt.columns.iter().zip(&stmt.values) {
369            let resolved = Self::resolve_dml_value(value_expr, params)?;
370            if column == "id" {
371                id = Some(Self::json_to_u64_id(&resolved)?);
372                continue;
373            }
374            if column == "vector" {
375                vector = Some(Self::json_to_vector(&resolved)?);
376                continue;
377            }
378            payload.insert(column.clone(), resolved);
379        }
380
381        Ok((id, vector, payload))
382    }
383
384    /// Builds a `Point` for an INSERT statement, validating vector presence.
385    fn build_insert_point(
386        collection: &crate::Collection,
387        point_id: u64,
388        vector: Option<Vec<f32>>,
389        payload: serde_json::Map<String, serde_json::Value>,
390    ) -> Result<crate::Point> {
391        if collection.is_metadata_only() {
392            if vector.is_some() {
393                return Err(Error::Query(
394                    "INSERT on metadata-only collection cannot set 'vector'".to_string(),
395                ));
396            }
397            Ok(crate::Point::metadata_only(
398                point_id,
399                serde_json::Value::Object(payload),
400            ))
401        } else {
402            let vec_value = vector.ok_or_else(|| {
403                Error::Query("INSERT on vector collection requires 'vector' column".to_string())
404            })?;
405            Ok(crate::Point::new(
406                point_id,
407                vec_value,
408                Some(serde_json::Value::Object(payload)),
409            ))
410        }
411    }
412
413    /// Executes an UPDATE statement.
414    #[allow(deprecated)]
415    fn execute_update(
416        &self,
417        stmt: &crate::velesql::UpdateStatement,
418        params: &std::collections::HashMap<String, serde_json::Value>,
419    ) -> Result<Vec<SearchResult>> {
420        let collection = self.resolve_writable_collection(&stmt.table)?;
421
422        let assignments = Self::resolve_update_assignments(stmt, params)?;
423        let filter = Self::build_update_filter(stmt.where_clause.as_ref())?;
424
425        let all_ids = collection.all_ids();
426        let rows = collection.get(&all_ids);
427        let updated_points =
428            Self::apply_update_assignments(&collection, rows, filter.as_ref(), &assignments)?;
429
430        Self::upsert_and_collect(&collection, updated_points)
431    }
432
433    /// Resolves and validates UPDATE assignment values.
434    fn resolve_update_assignments(
435        stmt: &crate::velesql::UpdateStatement,
436        params: &std::collections::HashMap<String, serde_json::Value>,
437    ) -> Result<Vec<(String, serde_json::Value)>> {
438        let assignments = stmt
439            .assignments
440            .iter()
441            .map(|a| Ok((a.column.clone(), Self::resolve_dml_value(&a.value, params)?)))
442            .collect::<Result<Vec<_>>>()?;
443
444        if assignments.iter().any(|(name, _)| name == "id") {
445            return Err(Error::Query(
446                "UPDATE cannot modify primary key column 'id'".to_string(),
447            ));
448        }
449        Ok(assignments)
450    }
451
452    /// Upserts updated points and returns them as search results.
453    #[allow(deprecated)]
454    fn upsert_and_collect(
455        collection: &crate::Collection,
456        updated_points: Vec<crate::Point>,
457    ) -> Result<Vec<SearchResult>> {
458        if updated_points.is_empty() {
459            return Ok(Vec::new());
460        }
461        let results = updated_points
462            .iter()
463            .map(|p| SearchResult::new(p.clone(), 0.0))
464            .collect();
465        collection.upsert(updated_points)?;
466        Ok(results)
467    }
468
469    /// Applies field assignments to matching points, producing updated points.
470    fn apply_update_assignments(
471        collection: &crate::Collection,
472        rows: Vec<Option<crate::Point>>,
473        filter: Option<&crate::Filter>,
474        assignments: &[(String, serde_json::Value)],
475    ) -> Result<Vec<crate::Point>> {
476        let mut updated_points = Vec::new();
477        for point in rows.into_iter().flatten() {
478            if !Self::matches_update_filter(&point, filter) {
479                continue;
480            }
481
482            let mut payload_map = point
483                .payload
484                .as_ref()
485                .and_then(serde_json::Value::as_object)
486                .cloned()
487                .unwrap_or_default();
488
489            let mut updated_vector = point.vector.clone();
490
491            for (field, value) in assignments {
492                if field == "vector" {
493                    if collection.is_metadata_only() {
494                        return Err(Error::Query(
495                            "UPDATE on metadata-only collection cannot set 'vector'".to_string(),
496                        ));
497                    }
498                    updated_vector = Self::json_to_vector(value)?;
499                } else {
500                    payload_map.insert(field.clone(), value.clone());
501                }
502            }
503
504            let updated = if collection.is_metadata_only() {
505                crate::Point::metadata_only(point.id, serde_json::Value::Object(payload_map))
506            } else {
507                crate::Point::new(
508                    point.id,
509                    updated_vector,
510                    Some(serde_json::Value::Object(payload_map)),
511                )
512            };
513            updated_points.push(updated);
514        }
515        Ok(updated_points)
516    }
517}