1use super::config::{VectorQuery, VectorQueryOptimizer, VectorQueryResult, VectorServiceArg};
4use super::cross_language::CrossLanguageProcessor;
5use super::monitoring::PerformanceMonitor;
6use crate::{
7 embeddings::{EmbeddableContent, EmbeddingManager},
8 graph_aware_search::{GraphAwareSearch, GraphContext, GraphSearchScope},
9 VectorStore,
10};
11use anyhow::{anyhow, Result};
12use std::collections::HashMap;
13use std::time::Instant;
14
15pub struct QueryExecutor {
17 vector_store: VectorStore,
18 embedding_manager: EmbeddingManager,
19 query_cache: HashMap<String, VectorQueryResult>,
20 optimizer: VectorQueryOptimizer,
21 performance_monitor: Option<PerformanceMonitor>,
22 cross_language_processor: CrossLanguageProcessor,
23 graph_aware_search: Option<GraphAwareSearch>,
24}
25
26impl QueryExecutor {
27 pub fn new(
28 vector_store: VectorStore,
29 embedding_manager: EmbeddingManager,
30 optimizer: VectorQueryOptimizer,
31 performance_monitor: Option<PerformanceMonitor>,
32 graph_aware_search: Option<GraphAwareSearch>,
33 ) -> Self {
34 Self {
35 vector_store,
36 embedding_manager,
37 query_cache: HashMap::new(),
38 optimizer,
39 performance_monitor,
40 cross_language_processor: CrossLanguageProcessor::new(),
41 graph_aware_search,
42 }
43 }
44
45 pub fn execute_optimized_query(&mut self, query: &VectorQuery) -> Result<VectorQueryResult> {
47 let start_time = Instant::now();
48
49 let optimized_query = if self.optimizer.enable_index_selection {
51 self.optimize_query(query)?
52 } else {
53 query.clone()
54 };
55
56 let result = self.execute_query_internal(&optimized_query);
58
59 let duration = start_time.elapsed();
61 if let Some(ref monitor) = self.performance_monitor {
62 monitor.record_query(duration, result.is_ok());
63 monitor.record_operation(&format!("query_{}", query.operation_type), duration);
64 }
65
66 result
67 }
68
69 fn optimize_query(&self, query: &VectorQuery) -> Result<VectorQuery> {
71 let mut optimized = query.clone();
72
73 if self.optimizer.enable_index_selection {
75 optimized.preferred_index = self.select_optimal_index(query)?;
76 }
77
78 if self.optimizer.enable_caching {
80 optimized.use_cache = true;
81 }
82
83 if self.optimizer.enable_parallel_execution && query.can_parallelize() {
85 optimized.parallel_execution = true;
86 }
87
88 Ok(optimized)
89 }
90
91 fn select_optimal_index(&self, query: &VectorQuery) -> Result<Option<String>> {
93 match query.operation_type.as_str() {
94 "similarity_search" => {
95 if query.estimated_result_size.unwrap_or(0) > 1000 {
97 Ok(Some("hnsw".to_string()))
98 } else {
99 Ok(Some("memory".to_string()))
100 }
101 }
102 "threshold_search" => {
103 Ok(Some("lsh".to_string()))
105 }
106 _ => Ok(None),
107 }
108 }
109
110 fn execute_query_internal(&mut self, query: &VectorQuery) -> Result<VectorQueryResult> {
112 if query.use_cache {
114 if let Some(cached_result) = self.get_cached_result(&query.cache_key()) {
115 if let Some(ref monitor) = self.performance_monitor {
116 monitor.record_cache_hit();
117 }
118 return Ok(cached_result.from_cache());
119 } else if let Some(ref monitor) = self.performance_monitor {
120 monitor.record_cache_miss();
121 }
122 }
123
124 let start_time = Instant::now();
125 let result = match query.operation_type.as_str() {
126 "similarity" => self.execute_similarity_query(query),
127 "similar" => self.execute_similar_query(query),
128 "search" | "search_text" => self.execute_search_query(query),
129 "searchIn" => self.execute_search_in_query(query),
130 "cluster" => self.execute_cluster_query(query),
131 "embed" | "embed_text" => self.execute_embed_query(query),
132 _ => Err(anyhow!("Unknown operation type: {}", query.operation_type)),
133 }?;
134
135 let execution_time = start_time.elapsed();
136 let query_result = VectorQueryResult::new(result, execution_time);
137
138 if query.use_cache {
140 self.cache_result(query.cache_key(), query_result.clone());
141 }
142
143 Ok(query_result)
144 }
145
146 fn execute_similarity_query(&mut self, query: &VectorQuery) -> Result<Vec<(String, f32)>> {
148 if query.args.len() < 2 {
149 return Err(anyhow!("Similarity query requires at least 2 arguments"));
150 }
151
152 let resource1 = match &query.args[0] {
153 VectorServiceArg::IRI(iri) => iri,
154 _ => return Err(anyhow!("First argument must be an IRI")),
155 };
156
157 let resource2 = match &query.args[1] {
158 VectorServiceArg::IRI(iri) => iri,
159 _ => return Err(anyhow!("Second argument must be an IRI")),
160 };
161
162 let vector1 = self
164 .vector_store
165 .get_vector(&resource1.clone())
166 .ok_or_else(|| anyhow!("Vector not found for resource: {}", resource1))?
167 .clone();
168 let vector2 = self
169 .vector_store
170 .get_vector(&resource2.clone())
171 .ok_or_else(|| anyhow!("Vector not found for resource: {}", resource2))?
172 .clone();
173
174 let similarity =
176 crate::similarity::cosine_similarity(&vector1.as_slice(), &vector2.as_slice());
177
178 Ok(vec![(format!("{resource1}-{resource2}"), similarity)])
179 }
180
181 fn execute_similar_query(&mut self, query: &VectorQuery) -> Result<Vec<(String, f32)>> {
183 if query.args.is_empty() {
184 return Err(anyhow!("Similar query requires at least 1 argument"));
185 }
186
187 let resource = match &query.args[0] {
188 VectorServiceArg::IRI(iri) => iri,
189 _ => return Err(anyhow!("First argument must be an IRI")),
190 };
191
192 let limit = if query.args.len() > 1 {
193 match &query.args[1] {
194 VectorServiceArg::Number(n) => *n as usize,
195 _ => 10,
196 }
197 } else {
198 10
199 };
200
201 let _threshold = if query.args.len() > 2 {
202 match &query.args[2] {
203 VectorServiceArg::Number(n) => *n,
204 _ => 0.0,
205 }
206 } else {
207 0.0
208 };
209
210 let query_vector = self
212 .vector_store
213 .get_vector(&resource.clone())
214 .ok_or_else(|| anyhow!("Vector not found for resource: {}", resource))?
215 .clone();
216
217 let results = self.vector_store.index.search_knn(&query_vector, limit)?;
219
220 Ok(results
221 .into_iter()
222 .filter(|(id, _)| id != resource) .collect())
224 }
225
226 fn execute_search_query(&mut self, query: &VectorQuery) -> Result<Vec<(String, f32)>> {
228 if query.args.is_empty() {
229 return Err(anyhow!("Search query requires at least 1 argument"));
230 }
231
232 let query_text = match &query.args[0] {
233 VectorServiceArg::String(text) | VectorServiceArg::Literal(text) => text,
234 _ => return Err(anyhow!("First argument must be text")),
235 };
236
237 let limit = if query.args.len() > 1 {
238 match &query.args[1] {
239 VectorServiceArg::Number(n) => *n as usize,
240 _ => 10,
241 }
242 } else {
243 10
244 };
245
246 let threshold = if query.args.len() > 2 {
247 match &query.args[2] {
248 VectorServiceArg::Number(n) => *n,
249 _ => 0.7,
250 }
251 } else {
252 0.7
253 };
254
255 let cross_language = if query.args.len() > 4 {
257 match &query.args[4] {
258 VectorServiceArg::String(val) => val == "true",
259 _ => false,
260 }
261 } else {
262 false
263 };
264
265 let target_languages = if query.args.len() > 5 {
266 match &query.args[5] {
267 VectorServiceArg::String(langs) => langs
268 .split(',')
269 .map(|s| s.trim().to_string())
270 .collect::<Vec<_>>(),
271 _ => vec!["en".to_string()],
272 }
273 } else {
274 vec!["en".to_string()]
275 };
276
277 if cross_language {
278 self.execute_cross_language_search(query_text, limit, threshold, &target_languages)
279 } else {
280 self.execute_simple_text_search(query_text, limit, threshold)
281 }
282 }
283
284 fn execute_simple_text_search(
286 &mut self,
287 query_text: &str,
288 limit: usize,
289 _threshold: f32,
290 ) -> Result<Vec<(String, f32)>> {
291 let content = EmbeddableContent::Text(query_text.to_string());
293
294 let query_vector = self.embedding_manager.get_embedding(&content)?;
295
296 self.vector_store.index.search_knn(&query_vector, limit)
298 }
299
300 fn execute_cross_language_search(
302 &mut self,
303 query_text: &str,
304 limit: usize,
305 _threshold: f32,
306 target_languages: &[String],
307 ) -> Result<Vec<(String, f32)>> {
308 let query_variations = self
310 .cross_language_processor
311 .process_cross_language_query(query_text, target_languages);
312
313 let mut all_results = Vec::new();
314
315 for (variation_text, weight) in query_variations {
317 let content = EmbeddableContent::Text(variation_text);
318
319 if let Ok(query_vector) = self.embedding_manager.get_embedding(&content) {
320 if let Ok(results) = self.vector_store.index.search_knn(&query_vector, limit) {
321 for (id, score) in results {
322 all_results.push((id, score * weight));
323 }
324 }
325 }
326 }
327
328 let merged_results = self.merge_search_results(all_results, limit);
330 Ok(merged_results)
331 }
332
333 fn execute_search_in_query(&mut self, query: &VectorQuery) -> Result<Vec<(String, f32)>> {
335 if query.args.len() < 2 {
336 return Err(anyhow!("SearchIn query requires at least 2 arguments"));
337 }
338
339 let query_text = match &query.args[0] {
340 VectorServiceArg::String(text) | VectorServiceArg::Literal(text) => text,
341 _ => return Err(anyhow!("First argument must be query text")),
342 };
343
344 let graph_iri = match &query.args[1] {
345 VectorServiceArg::IRI(iri) => iri,
346 _ => return Err(anyhow!("Second argument must be a graph IRI")),
347 };
348
349 let limit = if query.args.len() > 2 {
350 match &query.args[2] {
351 VectorServiceArg::Number(n) => *n as usize,
352 _ => 10,
353 }
354 } else {
355 10
356 };
357
358 let scope_str = if query.args.len() > 3 {
359 match &query.args[3] {
360 VectorServiceArg::String(s) => s.as_str(),
361 _ => "exact",
362 }
363 } else {
364 "exact"
365 };
366
367 let threshold = if query.args.len() > 4 {
368 match &query.args[4] {
369 VectorServiceArg::Number(n) => *n,
370 _ => 0.7,
371 }
372 } else {
373 0.7
374 };
375
376 let scope = match scope_str {
378 "children" => GraphSearchScope::IncludeChildren,
379 "parents" => GraphSearchScope::IncludeParents,
380 "hierarchy" => GraphSearchScope::FullHierarchy,
381 "related" => GraphSearchScope::Related,
382 _ => GraphSearchScope::Exact,
383 };
384
385 if let Some(ref _graph_search) = self.graph_aware_search {
386 let _context = GraphContext {
387 primary_graph: graph_iri.clone(),
388 additional_graphs: Vec::new(),
389 scope,
390 context_weights: HashMap::new(),
391 };
392
393 let content = EmbeddableContent::Text(query_text.to_string());
395 let _query_vector = self.embedding_manager.get_embedding(&content)?;
396
397 self.execute_simple_text_search(query_text, limit, threshold)
399 } else {
400 self.execute_simple_text_search(query_text, limit, threshold)
402 }
403 }
404
405 fn execute_cluster_query(&self, _query: &VectorQuery) -> Result<Vec<(String, f32)>> {
407 Err(anyhow!("Clustering not yet implemented"))
410 }
411
412 fn execute_embed_query(&mut self, query: &VectorQuery) -> Result<Vec<(String, f32)>> {
414 if query.args.is_empty() {
415 return Err(anyhow!("Embed query requires at least 1 argument"));
416 }
417
418 let text = match &query.args[0] {
419 VectorServiceArg::String(text) | VectorServiceArg::Literal(text) => text,
420 _ => return Err(anyhow!("First argument must be text")),
421 };
422
423 let content = EmbeddableContent::Text(text.to_string());
424
425 let vector = self.embedding_manager.get_embedding(&content)?;
426
427 let id = format!("embedded_{}", hash_string(text));
429 self.vector_store
430 .index
431 .add_vector(id.clone(), vector, None)?;
432
433 Ok(vec![(id, 1.0)])
434 }
435
436 fn merge_search_results(
438 &self,
439 results: Vec<(String, f32)>,
440 limit: usize,
441 ) -> Vec<(String, f32)> {
442 let mut result_map: HashMap<String, f32> = HashMap::new();
443
444 for (id, score) in results {
446 result_map
447 .entry(id)
448 .and_modify(|existing_score| *existing_score = existing_score.max(score))
449 .or_insert(score);
450 }
451
452 let mut merged: Vec<(String, f32)> = result_map.into_iter().collect();
454 merged.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
455
456 merged.truncate(limit);
458 merged
459 }
460
461 fn get_cached_result(&self, cache_key: &str) -> Option<VectorQueryResult> {
463 self.query_cache.get(cache_key).cloned()
464 }
465
466 fn cache_result(&mut self, cache_key: String, result: VectorQueryResult) {
468 if self.query_cache.len() >= 1000 {
470 if let Some(first_key) = self.query_cache.keys().next().cloned() {
472 self.query_cache.remove(&first_key);
473 }
474 }
475 self.query_cache.insert(cache_key, result);
476
477 if let Some(ref monitor) = self.performance_monitor {
479 monitor.update_cache_size(self.query_cache.len(), 1000);
480 }
481 }
482
483 pub fn clear_cache(&mut self) {
485 self.query_cache.clear();
486 if let Some(ref monitor) = self.performance_monitor {
487 monitor.update_cache_size(0, 1000);
488 }
489 }
490
491 pub fn cache_stats(&self) -> (usize, usize) {
493 (self.query_cache.len(), 1000)
494 }
495
496 pub fn add_resource_embedding(&mut self, uri: &str, content: &EmbeddableContent) -> Result<()> {
498 let vector = self.embedding_manager.get_embedding(content)?;
500
501 self.vector_store.index.insert(uri.to_string(), vector)?;
503
504 Ok(())
505 }
506}
507
508fn hash_string(s: &str) -> u64 {
510 use std::collections::hash_map::DefaultHasher;
511 use std::hash::{Hash, Hasher};
512
513 let mut hasher = DefaultHasher::new();
514 s.hash(&mut hasher);
515 hasher.finish()
516}
517
518#[cfg(test)]
519mod tests {
520 use super::*;
521 use crate::embeddings::EmbeddingStrategy;
522
523 #[test]
524 fn test_query_optimization() {
525 let vector_store = VectorStore::new();
526 let embedding_manager = EmbeddingManager::new(EmbeddingStrategy::TfIdf, 100).unwrap();
527 let optimizer = VectorQueryOptimizer::default();
528
529 let executor = QueryExecutor::new(vector_store, embedding_manager, optimizer, None, None);
530
531 let query = VectorQuery::new(
532 "similarity_search".to_string(),
533 vec![
534 VectorServiceArg::IRI("http://example.org/resource1".to_string()),
535 VectorServiceArg::IRI("http://example.org/resource2".to_string()),
536 ],
537 );
538
539 let optimized = executor.optimize_query(&query).unwrap();
540 assert!(optimized.use_cache);
541 }
542
543 #[test]
544 fn test_cache_key_generation() {
545 let query1 = VectorQuery::new(
546 "search".to_string(),
547 vec![VectorServiceArg::String("test".to_string())],
548 );
549
550 let query2 = VectorQuery::new(
551 "search".to_string(),
552 vec![VectorServiceArg::String("test".to_string())],
553 );
554
555 assert_eq!(query1.cache_key(), query2.cache_key());
556 }
557
558 #[test]
559 fn test_merge_search_results() {
560 let vector_store = VectorStore::new();
561 let embedding_manager = EmbeddingManager::new(EmbeddingStrategy::TfIdf, 100).unwrap();
562 let optimizer = VectorQueryOptimizer::default();
563
564 let executor = QueryExecutor::new(vector_store, embedding_manager, optimizer, None, None);
565
566 let results = vec![
567 ("doc1".to_string(), 0.8),
568 ("doc2".to_string(), 0.9),
569 ("doc1".to_string(), 0.7), ("doc3".to_string(), 0.6),
571 ];
572
573 let merged = executor.merge_search_results(results, 10);
574
575 assert_eq!(merged.len(), 3);
576 assert_eq!(merged[0].0, "doc2"); assert_eq!(merged[1].1, 0.8); }
579}