1use super::config::{VectorQuery, VectorQueryOptimizer, VectorQueryResult, VectorServiceArg};
4use super::cross_language::CrossLanguageProcessor;
5use super::monitoring::PerformanceMonitor;
6use crate::{
7 clustering::{ClusteringAlgorithm, ClusteringConfig, ClusteringEngine},
8 embeddings::{EmbeddableContent, EmbeddingManager},
9 graph_aware_search::{GraphAwareSearch, GraphContext, GraphSearchScope},
10 VectorStore,
11};
12use anyhow::{anyhow, Result};
13use std::collections::HashMap;
14use std::time::Instant;
15
16pub struct QueryExecutor {
18 vector_store: VectorStore,
19 embedding_manager: EmbeddingManager,
20 query_cache: HashMap<String, VectorQueryResult>,
21 optimizer: VectorQueryOptimizer,
22 performance_monitor: Option<PerformanceMonitor>,
23 cross_language_processor: CrossLanguageProcessor,
24 graph_aware_search: Option<GraphAwareSearch>,
25}
26
27impl QueryExecutor {
28 pub fn new(
29 vector_store: VectorStore,
30 embedding_manager: EmbeddingManager,
31 optimizer: VectorQueryOptimizer,
32 performance_monitor: Option<PerformanceMonitor>,
33 graph_aware_search: Option<GraphAwareSearch>,
34 ) -> Self {
35 Self {
36 vector_store,
37 embedding_manager,
38 query_cache: HashMap::new(),
39 optimizer,
40 performance_monitor,
41 cross_language_processor: CrossLanguageProcessor::new(),
42 graph_aware_search,
43 }
44 }
45
46 pub fn execute_optimized_query(&mut self, query: &VectorQuery) -> Result<VectorQueryResult> {
48 let start_time = Instant::now();
49
50 let optimized_query = if self.optimizer.enable_index_selection {
52 self.optimize_query(query)?
53 } else {
54 query.clone()
55 };
56
57 let result = self.execute_query_internal(&optimized_query);
59
60 let duration = start_time.elapsed();
62 if let Some(ref monitor) = self.performance_monitor {
63 monitor.record_query(duration, result.is_ok());
64 monitor.record_operation(&format!("query_{}", query.operation_type), duration);
65 }
66
67 result
68 }
69
70 fn optimize_query(&self, query: &VectorQuery) -> Result<VectorQuery> {
72 let mut optimized = query.clone();
73
74 if self.optimizer.enable_index_selection {
76 optimized.preferred_index = self.select_optimal_index(query)?;
77 }
78
79 if self.optimizer.enable_caching {
81 optimized.use_cache = true;
82 }
83
84 if self.optimizer.enable_parallel_execution && query.can_parallelize() {
86 optimized.parallel_execution = true;
87 }
88
89 Ok(optimized)
90 }
91
92 fn select_optimal_index(&self, query: &VectorQuery) -> Result<Option<String>> {
94 match query.operation_type.as_str() {
95 "similarity_search" => {
96 if query.estimated_result_size.unwrap_or(0) > 1000 {
98 Ok(Some("hnsw".to_string()))
99 } else {
100 Ok(Some("memory".to_string()))
101 }
102 }
103 "threshold_search" => {
104 Ok(Some("lsh".to_string()))
106 }
107 _ => Ok(None),
108 }
109 }
110
111 fn execute_query_internal(&mut self, query: &VectorQuery) -> Result<VectorQueryResult> {
113 if query.use_cache {
115 if let Some(cached_result) = self.get_cached_result(&query.cache_key()) {
116 if let Some(ref monitor) = self.performance_monitor {
117 monitor.record_cache_hit();
118 }
119 return Ok(cached_result.from_cache());
120 } else if let Some(ref monitor) = self.performance_monitor {
121 monitor.record_cache_miss();
122 }
123 }
124
125 let start_time = Instant::now();
126 let result = match query.operation_type.as_str() {
127 "similarity" => self.execute_similarity_query(query),
128 "similar" => self.execute_similar_query(query),
129 "search" | "search_text" => self.execute_search_query(query),
130 "searchIn" => self.execute_search_in_query(query),
131 "cluster" => self.execute_cluster_query(query),
132 "embed" | "embed_text" => self.execute_embed_query(query),
133 _ => Err(anyhow!("Unknown operation type: {}", query.operation_type)),
134 }?;
135
136 let execution_time = start_time.elapsed();
137 let query_result = VectorQueryResult::new(result, execution_time);
138
139 if query.use_cache {
141 self.cache_result(query.cache_key(), query_result.clone());
142 }
143
144 Ok(query_result)
145 }
146
147 fn execute_similarity_query(&mut self, query: &VectorQuery) -> Result<Vec<(String, f32)>> {
149 if query.args.len() < 2 {
150 return Err(anyhow!("Similarity query requires at least 2 arguments"));
151 }
152
153 let resource1 = match &query.args[0] {
154 VectorServiceArg::IRI(iri) => iri,
155 _ => return Err(anyhow!("First argument must be an IRI")),
156 };
157
158 let resource2 = match &query.args[1] {
159 VectorServiceArg::IRI(iri) => iri,
160 _ => return Err(anyhow!("Second argument must be an IRI")),
161 };
162
163 let vector1 = self
165 .vector_store
166 .get_vector(&resource1.clone())
167 .ok_or_else(|| anyhow!("Vector not found for resource: {}", resource1))?
168 .clone();
169 let vector2 = self
170 .vector_store
171 .get_vector(&resource2.clone())
172 .ok_or_else(|| anyhow!("Vector not found for resource: {}", resource2))?
173 .clone();
174
175 let similarity =
177 crate::similarity::cosine_similarity(&vector1.as_slice(), &vector2.as_slice());
178
179 Ok(vec![(format!("{resource1}-{resource2}"), similarity)])
180 }
181
182 fn execute_similar_query(&mut self, query: &VectorQuery) -> Result<Vec<(String, f32)>> {
184 if query.args.is_empty() {
185 return Err(anyhow!("Similar query requires at least 1 argument"));
186 }
187
188 let resource = match &query.args[0] {
189 VectorServiceArg::IRI(iri) => iri,
190 _ => return Err(anyhow!("First argument must be an IRI")),
191 };
192
193 let limit = if query.args.len() > 1 {
194 match &query.args[1] {
195 VectorServiceArg::Number(n) => *n as usize,
196 _ => 10,
197 }
198 } else {
199 10
200 };
201
202 let _threshold = if query.args.len() > 2 {
203 match &query.args[2] {
204 VectorServiceArg::Number(n) => *n,
205 _ => 0.0,
206 }
207 } else {
208 0.0
209 };
210
211 let query_vector = self
213 .vector_store
214 .get_vector(&resource.clone())
215 .ok_or_else(|| anyhow!("Vector not found for resource: {}", resource))?
216 .clone();
217
218 let results = self
220 .vector_store
221 .similarity_search_vector(&query_vector, limit)?;
222
223 Ok(results
224 .into_iter()
225 .filter(|(id, _)| id != resource) .collect())
227 }
228
229 fn execute_search_query(&mut self, query: &VectorQuery) -> Result<Vec<(String, f32)>> {
231 if query.args.is_empty() {
232 return Err(anyhow!("Search query requires at least 1 argument"));
233 }
234
235 let query_text = match &query.args[0] {
236 VectorServiceArg::String(text) | VectorServiceArg::Literal(text) => text,
237 _ => return Err(anyhow!("First argument must be text")),
238 };
239
240 let limit = if query.args.len() > 1 {
241 match &query.args[1] {
242 VectorServiceArg::Number(n) => *n as usize,
243 _ => 10,
244 }
245 } else {
246 10
247 };
248
249 let threshold = if query.args.len() > 2 {
250 match &query.args[2] {
251 VectorServiceArg::Number(n) => *n,
252 _ => 0.7,
253 }
254 } else {
255 0.7
256 };
257
258 let cross_language = if query.args.len() > 4 {
260 match &query.args[4] {
261 VectorServiceArg::String(val) => val == "true",
262 _ => false,
263 }
264 } else {
265 false
266 };
267
268 let target_languages = if query.args.len() > 5 {
269 match &query.args[5] {
270 VectorServiceArg::String(langs) => langs
271 .split(',')
272 .map(|s| s.trim().to_string())
273 .collect::<Vec<_>>(),
274 _ => vec!["en".to_string()],
275 }
276 } else {
277 vec!["en".to_string()]
278 };
279
280 if cross_language {
281 self.execute_cross_language_search(query_text, limit, threshold, &target_languages)
282 } else {
283 self.execute_simple_text_search(query_text, limit, threshold)
284 }
285 }
286
287 fn execute_simple_text_search(
289 &mut self,
290 query_text: &str,
291 limit: usize,
292 _threshold: f32,
293 ) -> Result<Vec<(String, f32)>> {
294 let content = EmbeddableContent::Text(query_text.to_string());
296
297 let query_vector = self.embedding_manager.get_embedding(&content)?;
298
299 self.vector_store
301 .similarity_search_vector(&query_vector, limit)
302 }
303
304 fn execute_cross_language_search(
306 &mut self,
307 query_text: &str,
308 limit: usize,
309 _threshold: f32,
310 target_languages: &[String],
311 ) -> Result<Vec<(String, f32)>> {
312 let query_variations = self
314 .cross_language_processor
315 .process_cross_language_query(query_text, target_languages);
316
317 let mut all_results = Vec::new();
318
319 for (variation_text, weight) in query_variations {
321 let content = EmbeddableContent::Text(variation_text);
322
323 if let Ok(query_vector) = self.embedding_manager.get_embedding(&content) {
324 if let Ok(results) = self
325 .vector_store
326 .similarity_search_vector(&query_vector, limit)
327 {
328 for (id, score) in results {
329 all_results.push((id, score * weight));
330 }
331 }
332 }
333 }
334
335 let merged_results = self.merge_search_results(all_results, limit);
337 Ok(merged_results)
338 }
339
340 fn execute_search_in_query(&mut self, query: &VectorQuery) -> Result<Vec<(String, f32)>> {
342 if query.args.len() < 2 {
343 return Err(anyhow!("SearchIn query requires at least 2 arguments"));
344 }
345
346 let query_text = match &query.args[0] {
347 VectorServiceArg::String(text) | VectorServiceArg::Literal(text) => text,
348 _ => return Err(anyhow!("First argument must be query text")),
349 };
350
351 let graph_iri = match &query.args[1] {
352 VectorServiceArg::IRI(iri) => iri,
353 _ => return Err(anyhow!("Second argument must be a graph IRI")),
354 };
355
356 let limit = if query.args.len() > 2 {
357 match &query.args[2] {
358 VectorServiceArg::Number(n) => *n as usize,
359 _ => 10,
360 }
361 } else {
362 10
363 };
364
365 let scope_str = if query.args.len() > 3 {
366 match &query.args[3] {
367 VectorServiceArg::String(s) => s.as_str(),
368 _ => "exact",
369 }
370 } else {
371 "exact"
372 };
373
374 let threshold = if query.args.len() > 4 {
375 match &query.args[4] {
376 VectorServiceArg::Number(n) => *n,
377 _ => 0.7,
378 }
379 } else {
380 0.7
381 };
382
383 let scope = match scope_str {
385 "children" => GraphSearchScope::IncludeChildren,
386 "parents" => GraphSearchScope::IncludeParents,
387 "hierarchy" => GraphSearchScope::FullHierarchy,
388 "related" => GraphSearchScope::Related,
389 _ => GraphSearchScope::Exact,
390 };
391
392 if let Some(ref _graph_search) = self.graph_aware_search {
393 let _context = GraphContext {
394 primary_graph: graph_iri.clone(),
395 additional_graphs: Vec::new(),
396 scope,
397 context_weights: HashMap::new(),
398 };
399
400 let content = EmbeddableContent::Text(query_text.to_string());
402 let _query_vector = self.embedding_manager.get_embedding(&content)?;
403
404 self.execute_simple_text_search(query_text, limit, threshold)
406 } else {
407 self.execute_simple_text_search(query_text, limit, threshold)
409 }
410 }
411
412 fn execute_cluster_query(&self, query: &VectorQuery) -> Result<Vec<(String, f32)>> {
429 let num_clusters = if query.args.is_empty() {
431 3usize
432 } else {
433 match &query.args[0] {
434 VectorServiceArg::Number(n) => (*n as usize).max(1),
435 _ => 3,
436 }
437 };
438
439 let algorithm = if query.args.len() > 1 {
440 match &query.args[1] {
441 VectorServiceArg::String(s) | VectorServiceArg::Literal(s) => match s.as_str() {
442 "dbscan" => ClusteringAlgorithm::DBSCAN,
443 "hierarchical" => ClusteringAlgorithm::Hierarchical,
444 "spectral" => ClusteringAlgorithm::Spectral,
445 "community" => ClusteringAlgorithm::Community,
446 "similarity" => ClusteringAlgorithm::Similarity,
447 _ => ClusteringAlgorithm::KMeans,
448 },
449 _ => ClusteringAlgorithm::KMeans,
450 }
451 } else {
452 ClusteringAlgorithm::KMeans
453 };
454
455 let similarity_threshold = if query.args.len() > 2 {
456 match &query.args[2] {
457 VectorServiceArg::Number(n) => *n,
458 _ => 0.7,
459 }
460 } else {
461 0.7
462 };
463
464 let resources: Vec<(String, crate::Vector)> = self.vector_store.iter_vectors();
466
467 if resources.is_empty() {
468 return Ok(Vec::new());
469 }
470
471 let config = ClusteringConfig {
473 algorithm,
474 num_clusters: Some(num_clusters),
475 similarity_threshold,
476 ..ClusteringConfig::default()
477 };
478
479 let engine = ClusteringEngine::new(config);
480 let clustering_result = engine.cluster(&resources)?;
481
482 let mut output: Vec<(String, f32)> = Vec::new();
484 for cluster in &clustering_result.clusters {
485 let cluster_score = cluster.id as f32;
486 for member in &cluster.members {
487 output.push((member.clone(), cluster_score));
488 }
489 }
490
491 Ok(output)
492 }
493
494 fn execute_embed_query(&mut self, query: &VectorQuery) -> Result<Vec<(String, f32)>> {
496 if query.args.is_empty() {
497 return Err(anyhow!("Embed query requires at least 1 argument"));
498 }
499
500 let text = match &query.args[0] {
501 VectorServiceArg::String(text) | VectorServiceArg::Literal(text) => text,
502 _ => return Err(anyhow!("First argument must be text")),
503 };
504
505 let content = EmbeddableContent::Text(text.to_string());
506
507 let vector = self.embedding_manager.get_embedding(&content)?;
508
509 let id = format!("embedded_{}", hash_string(text));
511 self.vector_store.index_vector(id.clone(), vector)?;
512
513 Ok(vec![(id, 1.0)])
514 }
515
516 fn merge_search_results(
518 &self,
519 results: Vec<(String, f32)>,
520 limit: usize,
521 ) -> Vec<(String, f32)> {
522 let mut result_map: HashMap<String, f32> = HashMap::new();
523
524 for (id, score) in results {
526 result_map
527 .entry(id)
528 .and_modify(|existing_score| *existing_score = existing_score.max(score))
529 .or_insert(score);
530 }
531
532 let mut merged: Vec<(String, f32)> = result_map.into_iter().collect();
534 merged.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
535
536 merged.truncate(limit);
538 merged
539 }
540
541 fn get_cached_result(&self, cache_key: &str) -> Option<VectorQueryResult> {
543 self.query_cache.get(cache_key).cloned()
544 }
545
546 fn cache_result(&mut self, cache_key: String, result: VectorQueryResult) {
548 if self.query_cache.len() >= 1000 {
550 if let Some(first_key) = self.query_cache.keys().next().cloned() {
552 self.query_cache.remove(&first_key);
553 }
554 }
555 self.query_cache.insert(cache_key, result);
556
557 if let Some(ref monitor) = self.performance_monitor {
559 monitor.update_cache_size(self.query_cache.len(), 1000);
560 }
561 }
562
563 pub fn clear_cache(&mut self) {
565 self.query_cache.clear();
566 if let Some(ref monitor) = self.performance_monitor {
567 monitor.update_cache_size(0, 1000);
568 }
569 }
570
571 pub fn cache_stats(&self) -> (usize, usize) {
573 (self.query_cache.len(), 1000)
574 }
575
576 pub fn add_resource_embedding(&mut self, uri: &str, content: &EmbeddableContent) -> Result<()> {
578 let vector = self.embedding_manager.get_embedding(content)?;
580
581 self.vector_store.index_vector(uri.to_string(), vector)?;
583
584 Ok(())
585 }
586}
587
588fn hash_string(s: &str) -> u64 {
590 use std::collections::hash_map::DefaultHasher;
591 use std::hash::{Hash, Hasher};
592
593 let mut hasher = DefaultHasher::new();
594 s.hash(&mut hasher);
595 hasher.finish()
596}
597
598#[cfg(test)]
599mod tests {
600 use super::*;
601 use crate::embeddings::EmbeddingStrategy;
602 use anyhow::Result;
603
604 #[test]
605 fn test_query_optimization() -> Result<()> {
606 let vector_store = VectorStore::new();
607 let embedding_manager = EmbeddingManager::new(EmbeddingStrategy::TfIdf, 100)?;
608 let optimizer = VectorQueryOptimizer::default();
609
610 let executor = QueryExecutor::new(vector_store, embedding_manager, optimizer, None, None);
611
612 let query = VectorQuery::new(
613 "similarity_search".to_string(),
614 vec![
615 VectorServiceArg::IRI("http://example.org/resource1".to_string()),
616 VectorServiceArg::IRI("http://example.org/resource2".to_string()),
617 ],
618 );
619
620 let optimized = executor.optimize_query(&query)?;
621 assert!(optimized.use_cache);
622 Ok(())
623 }
624
625 #[test]
626 fn test_cache_key_generation() {
627 let query1 = VectorQuery::new(
628 "search".to_string(),
629 vec![VectorServiceArg::String("test".to_string())],
630 );
631
632 let query2 = VectorQuery::new(
633 "search".to_string(),
634 vec![VectorServiceArg::String("test".to_string())],
635 );
636
637 assert_eq!(query1.cache_key(), query2.cache_key());
638 }
639
640 #[test]
641 fn test_merge_search_results() -> Result<()> {
642 let vector_store = VectorStore::new();
643 let embedding_manager = EmbeddingManager::new(EmbeddingStrategy::TfIdf, 100)?;
644 let optimizer = VectorQueryOptimizer::default();
645
646 let executor = QueryExecutor::new(vector_store, embedding_manager, optimizer, None, None);
647
648 let results = vec![
649 ("doc1".to_string(), 0.8),
650 ("doc2".to_string(), 0.9),
651 ("doc1".to_string(), 0.7), ("doc3".to_string(), 0.6),
653 ];
654
655 let merged = executor.merge_search_results(results, 10);
656
657 assert_eq!(merged.len(), 3);
658 assert_eq!(merged[0].0, "doc2"); assert_eq!(merged[1].1, 0.8); Ok(())
661 }
662
663 mod cluster_test_helpers {
665 use crate::{MemoryVectorIndex, Vector, VectorIndex as _};
666
667 pub fn build_clustered_index(
671 n_clusters: usize,
672 n_per_cluster: usize,
673 ) -> Box<dyn crate::VectorIndex> {
674 let mut idx = MemoryVectorIndex::new();
675
676 let dim = n_clusters.max(4); for cluster in 0..n_clusters {
684 for member in 0..n_per_cluster {
685 let mut values = vec![0.001f32; dim];
689 values[cluster] = 10.0 + (member as f32) * 0.01;
690 let id = format!("cluster{cluster}_member{member}");
691 idx.insert(id, Vector::new(values)).expect("insert ok");
692 }
693 }
694
695 Box::new(idx)
696 }
697 }
698
699 #[test]
702 fn test_cluster_query_happy_path() -> Result<()> {
703 use cluster_test_helpers::build_clustered_index;
704
705 let n_clusters = 3usize;
706 let n_per_cluster = 3usize;
707 let total = n_clusters * n_per_cluster;
708
709 let vector_store =
711 VectorStore::with_index(build_clustered_index(n_clusters, n_per_cluster));
712 let embedding_manager = EmbeddingManager::new(EmbeddingStrategy::TfIdf, 100)?;
713 let optimizer = VectorQueryOptimizer::default();
714 let executor = QueryExecutor::new(vector_store, embedding_manager, optimizer, None, None);
715
716 let query = VectorQuery::new(
717 "cluster".to_string(),
718 vec![
719 VectorServiceArg::Number(n_clusters as f32), VectorServiceArg::String("kmeans".to_string()),
721 ],
722 );
723
724 let results = executor.execute_cluster_query(&query)?;
725
726 assert_eq!(results.len(), total, "all members must be returned");
728
729 let mut cluster_ids: Vec<u32> = results
731 .iter()
732 .map(|(_, cid)| *cid as u32)
733 .collect::<std::collections::HashSet<_>>()
734 .into_iter()
735 .collect();
736 cluster_ids.sort();
737 assert_eq!(
738 cluster_ids.len(),
739 n_clusters,
740 "expected {n_clusters} distinct cluster ids, got {:?}",
741 cluster_ids
742 );
743
744 Ok(())
745 }
746
747 #[test]
749 fn test_cluster_query_empty_store() -> Result<()> {
750 let vector_store = VectorStore::new(); let embedding_manager = EmbeddingManager::new(EmbeddingStrategy::TfIdf, 100)?;
752 let optimizer = VectorQueryOptimizer::default();
753 let executor = QueryExecutor::new(vector_store, embedding_manager, optimizer, None, None);
754
755 let query = VectorQuery::new("cluster".to_string(), vec![VectorServiceArg::Number(3.0)]);
756
757 let results = executor.execute_cluster_query(&query)?;
758 assert!(
759 results.is_empty(),
760 "empty store must yield empty cluster result"
761 );
762 Ok(())
763 }
764
765 #[test]
767 fn test_cluster_query_invalid_k() -> Result<()> {
768 use cluster_test_helpers::build_clustered_index;
769
770 let n_clusters = 1usize;
772 let n_per_cluster = 2usize;
773
774 let vector_store =
775 VectorStore::with_index(build_clustered_index(n_clusters, n_per_cluster));
776 let embedding_manager = EmbeddingManager::new(EmbeddingStrategy::TfIdf, 100)?;
777 let optimizer = VectorQueryOptimizer::default();
778 let executor = QueryExecutor::new(vector_store, embedding_manager, optimizer, None, None);
779
780 let query = VectorQuery::new(
781 "cluster".to_string(),
782 vec![
783 VectorServiceArg::Number(5.0), VectorServiceArg::String("kmeans".to_string()),
785 ],
786 );
787
788 let result = executor.execute_cluster_query(&query);
789 assert!(
790 result.is_err(),
791 "k >= n should produce an error from the clustering engine"
792 );
793 Ok(())
794 }
795}