1use crate::{Error, Result, SearchResult};
4
5use super::Database;
6
7#[allow(deprecated)] impl Database {
9 fn canonical_json(value: serde_json::Value) -> serde_json::Value {
19 match value {
20 serde_json::Value::Object(map) => {
21 let sorted: serde_json::Map<String, serde_json::Value> = map
28 .into_iter()
29 .map(|(k, v)| (k, Self::canonical_json(v)))
30 .collect::<std::collections::BTreeMap<_, _>>()
31 .into_iter()
32 .collect();
33 serde_json::Value::Object(sorted)
34 }
35 serde_json::Value::Array(arr) => {
36 serde_json::Value::Array(arr.into_iter().map(Self::canonical_json).collect())
37 }
38 other => other,
39 }
40 }
41
42 #[must_use]
58 pub fn build_plan_key(&self, query: &crate::velesql::Query) -> crate::cache::PlanKey {
59 use std::hash::{BuildHasher, Hasher};
60
61 let query_text = serde_json::to_value(query)
65 .map(Self::canonical_json)
66 .and_then(|v| serde_json::to_string(&v))
67 .unwrap_or_else(|_| format!("{query:?}"));
68
69 let mut hasher = rustc_hash::FxBuildHasher.build_hasher();
70 hasher.write(query_text.as_bytes());
71 let query_hash = hasher.finish();
72
73 let schema_version = self.schema_version();
74 let collection_names = Self::referenced_collection_names(query);
75
76 let collection_generations: smallvec::SmallVec<[u64; 4]> = collection_names
78 .iter()
79 .map(|name| self.collection_write_generation(name).unwrap_or(0))
80 .collect();
81
82 crate::cache::PlanKey {
83 query_hash,
84 schema_version,
85 collection_generations,
86 }
87 }
88
89 pub fn explain_query(
108 &self,
109 query: &crate::velesql::Query,
110 ) -> Result<crate::velesql::QueryPlan> {
111 crate::velesql::QueryValidator::validate(query).map_err(|e| Error::Query(e.to_string()))?;
112
113 let plan_key = self.build_plan_key(query);
114
115 if let Some(cached) = self.compiled_plan_cache.get(&plan_key) {
116 let mut plan = cached.plan.clone();
117 plan.cache_hit = Some(true);
118 plan.plan_reuse_count = Some(
119 cached
120 .reuse_count
121 .load(std::sync::atomic::Ordering::Relaxed),
122 );
123 return Ok(plan);
124 }
125
126 let mut plan = crate::velesql::QueryPlan::from_select(&query.select);
127 plan.cache_hit = Some(false);
128 plan.plan_reuse_count = Some(0);
129 Ok(plan)
130 }
131
132 #[allow(clippy::too_many_lines)]
142 pub fn execute_query(
143 &self,
144 query: &crate::velesql::Query,
145 params: &std::collections::HashMap<String, serde_json::Value>,
146 ) -> Result<Vec<SearchResult>> {
147 crate::velesql::QueryValidator::validate(query).map_err(|e| Error::Query(e.to_string()))?;
148
149 if let Some(train) = query.train.as_ref() {
150 return self.execute_train(train);
151 }
152
153 if let Some(dml) = query.dml.as_ref() {
154 return self.execute_dml(dml, params);
155 }
156
157 if query.is_match_query() {
158 return Err(Error::Query(
159 "Database::execute_query does not support top-level MATCH queries. Use Collection::execute_query or pass the collection name."
160 .to_string(),
161 ));
162 }
163
164 let pre_exec_key = self.build_plan_key(query);
171 let is_cached = self.compiled_plan_cache.contains(&pre_exec_key);
172
173 let results = self.execute_select_query(query, params)?;
174
175 if !is_cached {
185 self.populate_plan_cache(query);
186 }
187
188 Ok(results)
189 }
190
191 fn execute_select_query(
193 &self,
194 query: &crate::velesql::Query,
195 params: &std::collections::HashMap<String, serde_json::Value>,
196 ) -> Result<Vec<SearchResult>> {
197 let compound_limit = Some(100_000_u64);
202 let left_results = if query.compound.is_some() {
203 let mut left_query = query.clone();
204 left_query.select.limit = compound_limit;
205 self.execute_single_select(&left_query, params)?
206 } else {
207 return self.execute_single_select(query, params);
208 };
209
210 if let Some(ref compound) = query.compound {
212 let mut accumulated = left_results;
213 for (operator, right_select) in &compound.operations {
214 let mut right_query = crate::velesql::Query::new_select(right_select.clone());
215 right_query.select.limit = compound_limit;
216 let right_results = self.execute_single_select(&right_query, params)?;
217 accumulated = crate::collection::search::query::set_operations::apply_set_operation(
218 accumulated,
219 right_results,
220 *operator,
221 );
222 }
223 if let Some(limit) = query.select.limit {
225 accumulated.truncate(usize::try_from(limit).unwrap_or(usize::MAX));
226 }
227 return Ok(accumulated);
228 }
229
230 Ok(left_results)
231 }
232
233 fn referenced_collection_names(query: &crate::velesql::Query) -> Vec<String> {
239 let mut names = vec![query.select.from.clone()];
240 for join in &query.select.joins {
241 names.push(join.table.clone());
242 }
243 if let Some(ref compound) = query.compound {
244 for (_, right_select) in &compound.operations {
245 names.push(right_select.from.clone());
246 for join in &right_select.joins {
247 names.push(join.table.clone());
248 }
249 }
250 }
251 names.sort();
252 names.dedup();
253 names
254 }
255
256 #[allow(deprecated)]
265 pub(super) fn resolve_collection(&self, name: &str) -> Result<crate::collection::Collection> {
266 self.get_collection(name)
267 .or_else(|| self.get_vector_collection(name).map(|vc| vc.inner))
268 .or_else(|| self.get_metadata_collection(name).map(|mc| mc.inner))
269 .ok_or_else(|| Error::CollectionNotFound(name.to_string()))
270 }
271
272 #[allow(deprecated)]
278 pub(super) fn resolve_writable_collection(
279 &self,
280 name: &str,
281 ) -> Result<crate::collection::Collection> {
282 self.get_collection(name)
283 .or_else(|| self.get_vector_collection(name).map(|vc| vc.inner))
284 .ok_or_else(|| Error::CollectionNotFound(name.to_string()))
285 }
286
287 fn execute_single_select(
289 &self,
290 query: &crate::velesql::Query,
291 params: &std::collections::HashMap<String, serde_json::Value>,
292 ) -> Result<Vec<SearchResult>> {
293 let base_collection = self.resolve_collection(&query.select.from)?;
294
295 let mut single_query = query.clone();
300 single_query.compound = None;
301
302 if single_query.select.joins.is_empty() {
303 return base_collection.execute_query(&single_query, params);
304 }
305
306 single_query.select.joins.clear();
307
308 let mut results = base_collection.execute_query(&single_query, params)?;
309 for join in &query.select.joins {
310 let join_collection = self.resolve_collection(&join.table)?;
311 let column_store = Self::build_join_column_store(&join_collection)?;
312 let joined = crate::collection::search::query::join::execute_join(
313 &results,
314 join,
315 &column_store,
316 )?;
317 results = crate::collection::search::query::join::joined_to_search_results(joined);
318 }
319 Ok(results)
320 }
321
322 fn populate_plan_cache(&self, query: &crate::velesql::Query) {
324 let compiled = std::sync::Arc::new(crate::cache::CompiledPlan {
325 plan: crate::velesql::QueryPlan::from_select(&query.select),
326 referenced_collections: Self::referenced_collection_names(query),
327 compiled_at: std::time::Instant::now(),
328 reuse_count: std::sync::atomic::AtomicU64::new(0),
329 });
330 let post_exec_key = self.build_plan_key(query);
332 self.compiled_plan_cache.insert(post_exec_key, compiled);
333 }
334
335 pub(super) fn execute_dml(
337 &self,
338 dml: &crate::velesql::DmlStatement,
339 params: &std::collections::HashMap<String, serde_json::Value>,
340 ) -> Result<Vec<SearchResult>> {
341 match dml {
342 crate::velesql::DmlStatement::Insert(stmt) => self.execute_insert(stmt, params),
343 crate::velesql::DmlStatement::Update(stmt) => self.execute_update(stmt, params),
344 }
345 }
346
347 #[allow(deprecated)]
349 fn execute_insert(
350 &self,
351 stmt: &crate::velesql::InsertStatement,
352 params: &std::collections::HashMap<String, serde_json::Value>,
353 ) -> Result<Vec<SearchResult>> {
354 let collection = self.resolve_writable_collection(&stmt.table)?;
355
356 let (id, vector, payload) = Self::resolve_insert_fields(stmt, params)?;
357 let point_id =
358 id.ok_or_else(|| Error::Query("INSERT requires integer 'id' column".to_string()))?;
359 let point = Self::build_insert_point(&collection, point_id, vector, payload)?;
360
361 let result = SearchResult::new(point.clone(), 0.0);
362 collection.upsert(vec![point])?;
363 Ok(vec![result])
364 }
365
366 #[allow(clippy::type_complexity)] fn resolve_insert_fields(
369 stmt: &crate::velesql::InsertStatement,
370 params: &std::collections::HashMap<String, serde_json::Value>,
371 ) -> Result<(
372 Option<u64>,
373 Option<Vec<f32>>,
374 serde_json::Map<String, serde_json::Value>,
375 )> {
376 let mut id: Option<u64> = None;
377 let mut payload = serde_json::Map::new();
378 let mut vector: Option<Vec<f32>> = None;
379
380 for (column, value_expr) in stmt.columns.iter().zip(&stmt.values) {
381 let resolved = Self::resolve_dml_value(value_expr, params)?;
382 if column == "id" {
383 id = Some(Self::json_to_u64_id(&resolved)?);
384 continue;
385 }
386 if column == "vector" {
387 vector = Some(Self::json_to_vector(&resolved)?);
388 continue;
389 }
390 payload.insert(column.clone(), resolved);
391 }
392
393 Ok((id, vector, payload))
394 }
395
396 fn build_insert_point(
398 collection: &crate::Collection,
399 point_id: u64,
400 vector: Option<Vec<f32>>,
401 payload: serde_json::Map<String, serde_json::Value>,
402 ) -> Result<crate::Point> {
403 if collection.is_metadata_only() {
404 if vector.is_some() {
405 return Err(Error::Query(
406 "INSERT on metadata-only collection cannot set 'vector'".to_string(),
407 ));
408 }
409 Ok(crate::Point::metadata_only(
410 point_id,
411 serde_json::Value::Object(payload),
412 ))
413 } else {
414 let vec_value = vector.ok_or_else(|| {
415 Error::Query("INSERT on vector collection requires 'vector' column".to_string())
416 })?;
417 Ok(crate::Point::new(
418 point_id,
419 vec_value,
420 Some(serde_json::Value::Object(payload)),
421 ))
422 }
423 }
424
425 #[allow(deprecated)]
427 fn execute_update(
428 &self,
429 stmt: &crate::velesql::UpdateStatement,
430 params: &std::collections::HashMap<String, serde_json::Value>,
431 ) -> Result<Vec<SearchResult>> {
432 let collection = self.resolve_writable_collection(&stmt.table)?;
433
434 let assignments = Self::resolve_update_assignments(stmt, params)?;
435 let filter = Self::build_update_filter(stmt.where_clause.as_ref())?;
436
437 let all_ids = collection.all_ids();
438 let rows = collection.get(&all_ids);
439 let updated_points =
440 Self::apply_update_assignments(&collection, rows, filter.as_ref(), &assignments)?;
441
442 Self::upsert_and_collect(&collection, updated_points)
443 }
444
445 fn resolve_update_assignments(
447 stmt: &crate::velesql::UpdateStatement,
448 params: &std::collections::HashMap<String, serde_json::Value>,
449 ) -> Result<Vec<(String, serde_json::Value)>> {
450 let assignments = stmt
451 .assignments
452 .iter()
453 .map(|a| Ok((a.column.clone(), Self::resolve_dml_value(&a.value, params)?)))
454 .collect::<Result<Vec<_>>>()?;
455
456 if assignments.iter().any(|(name, _)| name == "id") {
457 return Err(Error::Query(
458 "UPDATE cannot modify primary key column 'id'".to_string(),
459 ));
460 }
461 Ok(assignments)
462 }
463
464 #[allow(deprecated)]
466 fn upsert_and_collect(
467 collection: &crate::Collection,
468 updated_points: Vec<crate::Point>,
469 ) -> Result<Vec<SearchResult>> {
470 if updated_points.is_empty() {
471 return Ok(Vec::new());
472 }
473 let results = updated_points
474 .iter()
475 .map(|p| SearchResult::new(p.clone(), 0.0))
476 .collect();
477 collection.upsert(updated_points)?;
478 Ok(results)
479 }
480
481 fn apply_update_assignments(
483 collection: &crate::Collection,
484 rows: Vec<Option<crate::Point>>,
485 filter: Option<&crate::Filter>,
486 assignments: &[(String, serde_json::Value)],
487 ) -> Result<Vec<crate::Point>> {
488 let mut updated_points = Vec::new();
489 for point in rows.into_iter().flatten() {
490 if !Self::matches_update_filter(&point, filter) {
491 continue;
492 }
493
494 let mut payload_map = point
495 .payload
496 .as_ref()
497 .and_then(serde_json::Value::as_object)
498 .cloned()
499 .unwrap_or_default();
500
501 let mut updated_vector = point.vector.clone();
502
503 for (field, value) in assignments {
504 if field == "vector" {
505 if collection.is_metadata_only() {
506 return Err(Error::Query(
507 "UPDATE on metadata-only collection cannot set 'vector'".to_string(),
508 ));
509 }
510 updated_vector = Self::json_to_vector(value)?;
511 } else {
512 payload_map.insert(field.clone(), value.clone());
513 }
514 }
515
516 let updated = if collection.is_metadata_only() {
517 crate::Point::metadata_only(point.id, serde_json::Value::Object(payload_map))
518 } else {
519 crate::Point::new(
520 point.id,
521 updated_vector,
522 Some(serde_json::Value::Object(payload_map)),
523 )
524 };
525 updated_points.push(updated);
526 }
527 Ok(updated_points)
528 }
529}