1use crate::{Error, Result, SearchResult};
4
5use super::Database;
6
7#[allow(deprecated)] impl Database {
9 fn canonical_json(value: serde_json::Value) -> serde_json::Value {
19 match value {
20 serde_json::Value::Object(map) => {
21 let sorted: serde_json::Map<String, serde_json::Value> = map
28 .into_iter()
29 .map(|(k, v)| (k, Self::canonical_json(v)))
30 .collect::<std::collections::BTreeMap<_, _>>()
31 .into_iter()
32 .collect();
33 serde_json::Value::Object(sorted)
34 }
35 serde_json::Value::Array(arr) => {
36 serde_json::Value::Array(arr.into_iter().map(Self::canonical_json).collect())
37 }
38 other => other,
39 }
40 }
41
42 #[must_use]
58 pub fn build_plan_key(&self, query: &crate::velesql::Query) -> crate::cache::PlanKey {
59 use std::hash::{BuildHasher, Hasher};
60
61 let query_text = serde_json::to_value(query)
65 .map(Self::canonical_json)
66 .and_then(|v| serde_json::to_string(&v))
67 .unwrap_or_else(|_| format!("{query:?}"));
68
69 let mut hasher = rustc_hash::FxBuildHasher.build_hasher();
70 hasher.write(query_text.as_bytes());
71 let query_hash = hasher.finish();
72
73 let schema_version = self.schema_version();
74 let collection_names = Self::referenced_collection_names(query);
75
76 let collection_generations: smallvec::SmallVec<[u64; 4]> = collection_names
78 .iter()
79 .map(|name| self.collection_write_generation(name).unwrap_or(0))
80 .collect();
81
82 crate::cache::PlanKey {
83 query_hash,
84 schema_version,
85 collection_generations,
86 }
87 }
88
89 pub fn explain_query(
108 &self,
109 query: &crate::velesql::Query,
110 ) -> Result<crate::velesql::QueryPlan> {
111 crate::velesql::QueryValidator::validate(query).map_err(|e| Error::Query(e.to_string()))?;
112
113 let plan_key = self.build_plan_key(query);
114
115 if let Some(cached) = self.compiled_plan_cache.get(&plan_key) {
116 let mut plan = cached.plan.clone();
117 plan.cache_hit = Some(true);
118 plan.plan_reuse_count = Some(
119 cached
120 .reuse_count
121 .load(std::sync::atomic::Ordering::Relaxed),
122 );
123 return Ok(plan);
124 }
125
126 let mut plan = crate::velesql::QueryPlan::from_select(&query.select);
127 plan.cache_hit = Some(false);
128 plan.plan_reuse_count = Some(0);
129 Ok(plan)
130 }
131
132 #[allow(clippy::too_many_lines)]
142 pub fn execute_query(
143 &self,
144 query: &crate::velesql::Query,
145 params: &std::collections::HashMap<String, serde_json::Value>,
146 ) -> Result<Vec<SearchResult>> {
147 crate::velesql::QueryValidator::validate(query).map_err(|e| Error::Query(e.to_string()))?;
148
149 if let Some(train) = query.train.as_ref() {
150 return self.execute_train(train);
151 }
152
153 if let Some(dml) = query.dml.as_ref() {
154 return self.execute_dml(dml, params);
155 }
156
157 if query.is_match_query() {
158 return Err(Error::Query(
159 "Database::execute_query does not support top-level MATCH queries. Use Collection::execute_query or pass the collection name."
160 .to_string(),
161 ));
162 }
163
164 let pre_exec_key = self.build_plan_key(query);
171 let is_cached = self.compiled_plan_cache.contains(&pre_exec_key);
172
173 let results = self.execute_select_query(query, params)?;
174
175 if !is_cached {
185 self.populate_plan_cache(query);
186 }
187
188 Ok(results)
189 }
190
191 fn execute_select_query(
193 &self,
194 query: &crate::velesql::Query,
195 params: &std::collections::HashMap<String, serde_json::Value>,
196 ) -> Result<Vec<SearchResult>> {
197 let compound_limit = Some(100_000_u64);
202 let left_results = if query.compound.is_some() {
203 let mut left_query = query.clone();
204 left_query.select.limit = compound_limit;
205 self.execute_single_select(&left_query, params)?
206 } else {
207 return self.execute_single_select(query, params);
208 };
209
210 if let Some(ref compound) = query.compound {
212 let mut right_query = crate::velesql::Query::new_select(*compound.right.clone());
213 right_query.select.limit = compound_limit;
214 let right_results = self.execute_single_select(&right_query, params)?;
215 let mut merged = crate::collection::search::query::set_operations::apply_set_operation(
216 left_results,
217 right_results,
218 compound.operator,
219 );
220 if let Some(limit) = query.select.limit {
222 merged.truncate(usize::try_from(limit).unwrap_or(usize::MAX));
223 }
224 return Ok(merged);
225 }
226
227 Ok(left_results)
228 }
229
230 fn referenced_collection_names(query: &crate::velesql::Query) -> Vec<String> {
235 let mut names = vec![query.select.from.clone()];
236 for join in &query.select.joins {
237 names.push(join.table.clone());
238 }
239 names.sort();
240 names.dedup();
241 names
242 }
243
244 #[allow(deprecated)]
253 pub(super) fn resolve_collection(&self, name: &str) -> Result<crate::collection::Collection> {
254 self.get_collection(name)
255 .or_else(|| self.get_vector_collection(name).map(|vc| vc.inner))
256 .or_else(|| self.get_metadata_collection(name).map(|mc| mc.inner))
257 .ok_or_else(|| Error::CollectionNotFound(name.to_string()))
258 }
259
260 #[allow(deprecated)]
266 pub(super) fn resolve_writable_collection(
267 &self,
268 name: &str,
269 ) -> Result<crate::collection::Collection> {
270 self.get_collection(name)
271 .or_else(|| self.get_vector_collection(name).map(|vc| vc.inner))
272 .ok_or_else(|| Error::CollectionNotFound(name.to_string()))
273 }
274
275 fn execute_single_select(
277 &self,
278 query: &crate::velesql::Query,
279 params: &std::collections::HashMap<String, serde_json::Value>,
280 ) -> Result<Vec<SearchResult>> {
281 let base_collection = self.resolve_collection(&query.select.from)?;
282
283 let mut single_query = query.clone();
288 single_query.compound = None;
289
290 if single_query.select.joins.is_empty() {
291 return base_collection.execute_query(&single_query, params);
292 }
293
294 single_query.select.joins.clear();
295
296 let mut results = base_collection.execute_query(&single_query, params)?;
297 for join in &query.select.joins {
298 let join_collection = self.resolve_collection(&join.table)?;
299 let column_store = Self::build_join_column_store(&join_collection)?;
300 let joined = crate::collection::search::query::join::execute_join(
301 &results,
302 join,
303 &column_store,
304 )?;
305 results = crate::collection::search::query::join::joined_to_search_results(joined);
306 }
307 Ok(results)
308 }
309
310 fn populate_plan_cache(&self, query: &crate::velesql::Query) {
312 let compiled = std::sync::Arc::new(crate::cache::CompiledPlan {
313 plan: crate::velesql::QueryPlan::from_select(&query.select),
314 referenced_collections: Self::referenced_collection_names(query),
315 compiled_at: std::time::Instant::now(),
316 reuse_count: std::sync::atomic::AtomicU64::new(0),
317 });
318 let post_exec_key = self.build_plan_key(query);
320 self.compiled_plan_cache.insert(post_exec_key, compiled);
321 }
322
323 pub(super) fn execute_dml(
325 &self,
326 dml: &crate::velesql::DmlStatement,
327 params: &std::collections::HashMap<String, serde_json::Value>,
328 ) -> Result<Vec<SearchResult>> {
329 match dml {
330 crate::velesql::DmlStatement::Insert(stmt) => self.execute_insert(stmt, params),
331 crate::velesql::DmlStatement::Update(stmt) => self.execute_update(stmt, params),
332 }
333 }
334
335 #[allow(deprecated)]
337 fn execute_insert(
338 &self,
339 stmt: &crate::velesql::InsertStatement,
340 params: &std::collections::HashMap<String, serde_json::Value>,
341 ) -> Result<Vec<SearchResult>> {
342 let collection = self.resolve_writable_collection(&stmt.table)?;
343
344 let (id, vector, payload) = Self::resolve_insert_fields(stmt, params)?;
345 let point_id =
346 id.ok_or_else(|| Error::Query("INSERT requires integer 'id' column".to_string()))?;
347 let point = Self::build_insert_point(&collection, point_id, vector, payload)?;
348
349 let result = SearchResult::new(point.clone(), 0.0);
350 collection.upsert(vec![point])?;
351 Ok(vec![result])
352 }
353
354 #[allow(clippy::type_complexity)] fn resolve_insert_fields(
357 stmt: &crate::velesql::InsertStatement,
358 params: &std::collections::HashMap<String, serde_json::Value>,
359 ) -> Result<(
360 Option<u64>,
361 Option<Vec<f32>>,
362 serde_json::Map<String, serde_json::Value>,
363 )> {
364 let mut id: Option<u64> = None;
365 let mut payload = serde_json::Map::new();
366 let mut vector: Option<Vec<f32>> = None;
367
368 for (column, value_expr) in stmt.columns.iter().zip(&stmt.values) {
369 let resolved = Self::resolve_dml_value(value_expr, params)?;
370 if column == "id" {
371 id = Some(Self::json_to_u64_id(&resolved)?);
372 continue;
373 }
374 if column == "vector" {
375 vector = Some(Self::json_to_vector(&resolved)?);
376 continue;
377 }
378 payload.insert(column.clone(), resolved);
379 }
380
381 Ok((id, vector, payload))
382 }
383
384 fn build_insert_point(
386 collection: &crate::Collection,
387 point_id: u64,
388 vector: Option<Vec<f32>>,
389 payload: serde_json::Map<String, serde_json::Value>,
390 ) -> Result<crate::Point> {
391 if collection.is_metadata_only() {
392 if vector.is_some() {
393 return Err(Error::Query(
394 "INSERT on metadata-only collection cannot set 'vector'".to_string(),
395 ));
396 }
397 Ok(crate::Point::metadata_only(
398 point_id,
399 serde_json::Value::Object(payload),
400 ))
401 } else {
402 let vec_value = vector.ok_or_else(|| {
403 Error::Query("INSERT on vector collection requires 'vector' column".to_string())
404 })?;
405 Ok(crate::Point::new(
406 point_id,
407 vec_value,
408 Some(serde_json::Value::Object(payload)),
409 ))
410 }
411 }
412
413 #[allow(deprecated)]
415 fn execute_update(
416 &self,
417 stmt: &crate::velesql::UpdateStatement,
418 params: &std::collections::HashMap<String, serde_json::Value>,
419 ) -> Result<Vec<SearchResult>> {
420 let collection = self.resolve_writable_collection(&stmt.table)?;
421
422 let assignments = Self::resolve_update_assignments(stmt, params)?;
423 let filter = Self::build_update_filter(stmt.where_clause.as_ref())?;
424
425 let all_ids = collection.all_ids();
426 let rows = collection.get(&all_ids);
427 let updated_points =
428 Self::apply_update_assignments(&collection, rows, filter.as_ref(), &assignments)?;
429
430 Self::upsert_and_collect(&collection, updated_points)
431 }
432
433 fn resolve_update_assignments(
435 stmt: &crate::velesql::UpdateStatement,
436 params: &std::collections::HashMap<String, serde_json::Value>,
437 ) -> Result<Vec<(String, serde_json::Value)>> {
438 let assignments = stmt
439 .assignments
440 .iter()
441 .map(|a| Ok((a.column.clone(), Self::resolve_dml_value(&a.value, params)?)))
442 .collect::<Result<Vec<_>>>()?;
443
444 if assignments.iter().any(|(name, _)| name == "id") {
445 return Err(Error::Query(
446 "UPDATE cannot modify primary key column 'id'".to_string(),
447 ));
448 }
449 Ok(assignments)
450 }
451
452 #[allow(deprecated)]
454 fn upsert_and_collect(
455 collection: &crate::Collection,
456 updated_points: Vec<crate::Point>,
457 ) -> Result<Vec<SearchResult>> {
458 if updated_points.is_empty() {
459 return Ok(Vec::new());
460 }
461 let results = updated_points
462 .iter()
463 .map(|p| SearchResult::new(p.clone(), 0.0))
464 .collect();
465 collection.upsert(updated_points)?;
466 Ok(results)
467 }
468
469 fn apply_update_assignments(
471 collection: &crate::Collection,
472 rows: Vec<Option<crate::Point>>,
473 filter: Option<&crate::Filter>,
474 assignments: &[(String, serde_json::Value)],
475 ) -> Result<Vec<crate::Point>> {
476 let mut updated_points = Vec::new();
477 for point in rows.into_iter().flatten() {
478 if !Self::matches_update_filter(&point, filter) {
479 continue;
480 }
481
482 let mut payload_map = point
483 .payload
484 .as_ref()
485 .and_then(serde_json::Value::as_object)
486 .cloned()
487 .unwrap_or_default();
488
489 let mut updated_vector = point.vector.clone();
490
491 for (field, value) in assignments {
492 if field == "vector" {
493 if collection.is_metadata_only() {
494 return Err(Error::Query(
495 "UPDATE on metadata-only collection cannot set 'vector'".to_string(),
496 ));
497 }
498 updated_vector = Self::json_to_vector(value)?;
499 } else {
500 payload_map.insert(field.clone(), value.clone());
501 }
502 }
503
504 let updated = if collection.is_metadata_only() {
505 crate::Point::metadata_only(point.id, serde_json::Value::Object(payload_map))
506 } else {
507 crate::Point::new(
508 point.id,
509 updated_vector,
510 Some(serde_json::Value::Object(payload_map)),
511 )
512 };
513 updated_points.push(updated);
514 }
515 Ok(updated_points)
516 }
517}