oxirs_vec/sparql_integration/
query_executor.rs

1//! Core query execution and optimization for SPARQL vector operations
2
3use super::config::{VectorQuery, VectorQueryOptimizer, VectorQueryResult, VectorServiceArg};
4use super::cross_language::CrossLanguageProcessor;
5use super::monitoring::PerformanceMonitor;
6use crate::{
7    embeddings::{EmbeddableContent, EmbeddingManager},
8    graph_aware_search::{GraphAwareSearch, GraphContext, GraphSearchScope},
9    VectorStore,
10};
11use anyhow::{anyhow, Result};
12use std::collections::HashMap;
13use std::time::Instant;
14
15/// Core query execution engine for vector operations
16pub struct QueryExecutor {
17    vector_store: VectorStore,
18    embedding_manager: EmbeddingManager,
19    query_cache: HashMap<String, VectorQueryResult>,
20    optimizer: VectorQueryOptimizer,
21    performance_monitor: Option<PerformanceMonitor>,
22    cross_language_processor: CrossLanguageProcessor,
23    graph_aware_search: Option<GraphAwareSearch>,
24}
25
26impl QueryExecutor {
27    pub fn new(
28        vector_store: VectorStore,
29        embedding_manager: EmbeddingManager,
30        optimizer: VectorQueryOptimizer,
31        performance_monitor: Option<PerformanceMonitor>,
32        graph_aware_search: Option<GraphAwareSearch>,
33    ) -> Self {
34        Self {
35            vector_store,
36            embedding_manager,
37            query_cache: HashMap::new(),
38            optimizer,
39            performance_monitor,
40            cross_language_processor: CrossLanguageProcessor::new(),
41            graph_aware_search,
42        }
43    }
44
45    /// Execute query with performance monitoring and optimization
46    pub fn execute_optimized_query(&mut self, query: &VectorQuery) -> Result<VectorQueryResult> {
47        let start_time = Instant::now();
48
49        // Apply query optimization if enabled
50        let optimized_query = if self.optimizer.enable_index_selection {
51            self.optimize_query(query)?
52        } else {
53            query.clone()
54        };
55
56        // Execute the query
57        let result = self.execute_query_internal(&optimized_query);
58
59        // Record performance metrics
60        let duration = start_time.elapsed();
61        if let Some(ref monitor) = self.performance_monitor {
62            monitor.record_query(duration, result.is_ok());
63            monitor.record_operation(&format!("query_{}", query.operation_type), duration);
64        }
65
66        result
67    }
68
69    /// Optimize query for better performance
70    fn optimize_query(&self, query: &VectorQuery) -> Result<VectorQuery> {
71        let mut optimized = query.clone();
72
73        // Index selection optimization
74        if self.optimizer.enable_index_selection {
75            optimized.preferred_index = self.select_optimal_index(query)?;
76        }
77
78        // Caching optimization
79        if self.optimizer.enable_caching {
80            optimized.use_cache = true;
81        }
82
83        // Parallel execution optimization
84        if self.optimizer.enable_parallel_execution && query.can_parallelize() {
85            optimized.parallel_execution = true;
86        }
87
88        Ok(optimized)
89    }
90
91    /// Select optimal index for query execution
92    fn select_optimal_index(&self, query: &VectorQuery) -> Result<Option<String>> {
93        match query.operation_type.as_str() {
94            "similarity_search" => {
95                // For similarity search, index is usually better for large datasets
96                if query.estimated_result_size.unwrap_or(0) > 1000 {
97                    Ok(Some("hnsw".to_string()))
98                } else {
99                    Ok(Some("memory".to_string()))
100                }
101            }
102            "threshold_search" => {
103                // Threshold search benefits from approximate indices
104                Ok(Some("lsh".to_string()))
105            }
106            _ => Ok(None),
107        }
108    }
109
110    /// Execute query with internal optimizations
111    fn execute_query_internal(&mut self, query: &VectorQuery) -> Result<VectorQueryResult> {
112        // Check cache first if enabled
113        if query.use_cache {
114            if let Some(cached_result) = self.get_cached_result(&query.cache_key()) {
115                if let Some(ref monitor) = self.performance_monitor {
116                    monitor.record_cache_hit();
117                }
118                return Ok(cached_result.from_cache());
119            } else if let Some(ref monitor) = self.performance_monitor {
120                monitor.record_cache_miss();
121            }
122        }
123
124        let start_time = Instant::now();
125        let result = match query.operation_type.as_str() {
126            "similarity" => self.execute_similarity_query(query),
127            "similar" => self.execute_similar_query(query),
128            "search" | "search_text" => self.execute_search_query(query),
129            "searchIn" => self.execute_search_in_query(query),
130            "cluster" => self.execute_cluster_query(query),
131            "embed" | "embed_text" => self.execute_embed_query(query),
132            _ => Err(anyhow!("Unknown operation type: {}", query.operation_type)),
133        }?;
134
135        let execution_time = start_time.elapsed();
136        let query_result = VectorQueryResult::new(result, execution_time);
137
138        // Cache the result if caching is enabled
139        if query.use_cache {
140            self.cache_result(query.cache_key(), query_result.clone());
141        }
142
143        Ok(query_result)
144    }
145
146    /// Execute similarity query between two resources
147    fn execute_similarity_query(&mut self, query: &VectorQuery) -> Result<Vec<(String, f32)>> {
148        if query.args.len() < 2 {
149            return Err(anyhow!("Similarity query requires at least 2 arguments"));
150        }
151
152        let resource1 = match &query.args[0] {
153            VectorServiceArg::IRI(iri) => iri,
154            _ => return Err(anyhow!("First argument must be an IRI")),
155        };
156
157        let resource2 = match &query.args[1] {
158            VectorServiceArg::IRI(iri) => iri,
159            _ => return Err(anyhow!("Second argument must be an IRI")),
160        };
161
162        // Get vectors for both resources
163        let vector1 = self
164            .vector_store
165            .get_vector(&resource1.clone())
166            .ok_or_else(|| anyhow!("Vector not found for resource: {}", resource1))?
167            .clone();
168        let vector2 = self
169            .vector_store
170            .get_vector(&resource2.clone())
171            .ok_or_else(|| anyhow!("Vector not found for resource: {}", resource2))?
172            .clone();
173
174        // Calculate similarity
175        let similarity =
176            crate::similarity::cosine_similarity(&vector1.as_slice(), &vector2.as_slice());
177
178        Ok(vec![(format!("{resource1}-{resource2}"), similarity)])
179    }
180
181    /// Execute similar query to find similar resources
182    fn execute_similar_query(&mut self, query: &VectorQuery) -> Result<Vec<(String, f32)>> {
183        if query.args.is_empty() {
184            return Err(anyhow!("Similar query requires at least 1 argument"));
185        }
186
187        let resource = match &query.args[0] {
188            VectorServiceArg::IRI(iri) => iri,
189            _ => return Err(anyhow!("First argument must be an IRI")),
190        };
191
192        let limit = if query.args.len() > 1 {
193            match &query.args[1] {
194                VectorServiceArg::Number(n) => *n as usize,
195                _ => 10,
196            }
197        } else {
198            10
199        };
200
201        let _threshold = if query.args.len() > 2 {
202            match &query.args[2] {
203                VectorServiceArg::Number(n) => *n,
204                _ => 0.0,
205            }
206        } else {
207            0.0
208        };
209
210        // Get vector for the resource
211        let query_vector = self
212            .vector_store
213            .get_vector(&resource.clone())
214            .ok_or_else(|| anyhow!("Vector not found for resource: {}", resource))?
215            .clone();
216
217        // Perform similarity search
218        let results = self.vector_store.index.search_knn(&query_vector, limit)?;
219
220        Ok(results
221            .into_iter()
222            .filter(|(id, _)| id != resource) // Exclude the query resource itself
223            .collect())
224    }
225
226    /// Execute text search query
227    fn execute_search_query(&mut self, query: &VectorQuery) -> Result<Vec<(String, f32)>> {
228        if query.args.is_empty() {
229            return Err(anyhow!("Search query requires at least 1 argument"));
230        }
231
232        let query_text = match &query.args[0] {
233            VectorServiceArg::String(text) | VectorServiceArg::Literal(text) => text,
234            _ => return Err(anyhow!("First argument must be text")),
235        };
236
237        let limit = if query.args.len() > 1 {
238            match &query.args[1] {
239                VectorServiceArg::Number(n) => *n as usize,
240                _ => 10,
241            }
242        } else {
243            10
244        };
245
246        let threshold = if query.args.len() > 2 {
247            match &query.args[2] {
248                VectorServiceArg::Number(n) => *n,
249                _ => 0.7,
250            }
251        } else {
252            0.7
253        };
254
255        // Check for cross-language search parameters
256        let cross_language = if query.args.len() > 4 {
257            match &query.args[4] {
258                VectorServiceArg::String(val) => val == "true",
259                _ => false,
260            }
261        } else {
262            false
263        };
264
265        let target_languages = if query.args.len() > 5 {
266            match &query.args[5] {
267                VectorServiceArg::String(langs) => langs
268                    .split(',')
269                    .map(|s| s.trim().to_string())
270                    .collect::<Vec<_>>(),
271                _ => vec!["en".to_string()],
272            }
273        } else {
274            vec!["en".to_string()]
275        };
276
277        if cross_language {
278            self.execute_cross_language_search(query_text, limit, threshold, &target_languages)
279        } else {
280            self.execute_simple_text_search(query_text, limit, threshold)
281        }
282    }
283
284    /// Execute simple text search
285    fn execute_simple_text_search(
286        &mut self,
287        query_text: &str,
288        limit: usize,
289        _threshold: f32,
290    ) -> Result<Vec<(String, f32)>> {
291        // Generate embedding for the query text
292        let content = EmbeddableContent::Text(query_text.to_string());
293
294        let query_vector = self.embedding_manager.get_embedding(&content)?;
295
296        // Perform similarity search
297        self.vector_store.index.search_knn(&query_vector, limit)
298    }
299
300    /// Execute cross-language search
301    fn execute_cross_language_search(
302        &mut self,
303        query_text: &str,
304        limit: usize,
305        _threshold: f32,
306        target_languages: &[String],
307    ) -> Result<Vec<(String, f32)>> {
308        // Process query with cross-language variations
309        let query_variations = self
310            .cross_language_processor
311            .process_cross_language_query(query_text, target_languages);
312
313        let mut all_results = Vec::new();
314
315        // Execute search for each query variation
316        for (variation_text, weight) in query_variations {
317            let content = EmbeddableContent::Text(variation_text);
318
319            if let Ok(query_vector) = self.embedding_manager.get_embedding(&content) {
320                if let Ok(results) = self.vector_store.index.search_knn(&query_vector, limit) {
321                    for (id, score) in results {
322                        all_results.push((id, score * weight));
323                    }
324                }
325            }
326        }
327
328        // Merge and deduplicate results
329        let merged_results = self.merge_search_results(all_results, limit);
330        Ok(merged_results)
331    }
332
333    /// Execute graph-scoped search query
334    fn execute_search_in_query(&mut self, query: &VectorQuery) -> Result<Vec<(String, f32)>> {
335        if query.args.len() < 2 {
336            return Err(anyhow!("SearchIn query requires at least 2 arguments"));
337        }
338
339        let query_text = match &query.args[0] {
340            VectorServiceArg::String(text) | VectorServiceArg::Literal(text) => text,
341            _ => return Err(anyhow!("First argument must be query text")),
342        };
343
344        let graph_iri = match &query.args[1] {
345            VectorServiceArg::IRI(iri) => iri,
346            _ => return Err(anyhow!("Second argument must be a graph IRI")),
347        };
348
349        let limit = if query.args.len() > 2 {
350            match &query.args[2] {
351                VectorServiceArg::Number(n) => *n as usize,
352                _ => 10,
353            }
354        } else {
355            10
356        };
357
358        let scope_str = if query.args.len() > 3 {
359            match &query.args[3] {
360                VectorServiceArg::String(s) => s.as_str(),
361                _ => "exact",
362            }
363        } else {
364            "exact"
365        };
366
367        let threshold = if query.args.len() > 4 {
368            match &query.args[4] {
369                VectorServiceArg::Number(n) => *n,
370                _ => 0.7,
371            }
372        } else {
373            0.7
374        };
375
376        // Convert scope string to enum
377        let scope = match scope_str {
378            "children" => GraphSearchScope::IncludeChildren,
379            "parents" => GraphSearchScope::IncludeParents,
380            "hierarchy" => GraphSearchScope::FullHierarchy,
381            "related" => GraphSearchScope::Related,
382            _ => GraphSearchScope::Exact,
383        };
384
385        if let Some(ref _graph_search) = self.graph_aware_search {
386            let _context = GraphContext {
387                primary_graph: graph_iri.clone(),
388                additional_graphs: Vec::new(),
389                scope,
390                context_weights: HashMap::new(),
391            };
392
393            // Generate embedding for query text
394            let content = EmbeddableContent::Text(query_text.to_string());
395            let _query_vector = self.embedding_manager.get_embedding(&content)?;
396
397            // Since search_with_context doesn't exist, fallback to simple search
398            self.execute_simple_text_search(query_text, limit, threshold)
399        } else {
400            // Fallback to simple search if graph-aware search is not available
401            self.execute_simple_text_search(query_text, limit, threshold)
402        }
403    }
404
405    /// Execute clustering query
406    fn execute_cluster_query(&self, _query: &VectorQuery) -> Result<Vec<(String, f32)>> {
407        // Simplified clustering implementation
408        // In a real implementation, this would use clustering algorithms
409        Err(anyhow!("Clustering not yet implemented"))
410    }
411
412    /// Execute embedding generation query
413    fn execute_embed_query(&mut self, query: &VectorQuery) -> Result<Vec<(String, f32)>> {
414        if query.args.is_empty() {
415            return Err(anyhow!("Embed query requires at least 1 argument"));
416        }
417
418        let text = match &query.args[0] {
419            VectorServiceArg::String(text) | VectorServiceArg::Literal(text) => text,
420            _ => return Err(anyhow!("First argument must be text")),
421        };
422
423        let content = EmbeddableContent::Text(text.to_string());
424
425        let vector = self.embedding_manager.get_embedding(&content)?;
426
427        // Store the vector with a generated ID
428        let id = format!("embedded_{}", hash_string(text));
429        self.vector_store
430            .index
431            .add_vector(id.clone(), vector, None)?;
432
433        Ok(vec![(id, 1.0)])
434    }
435
436    /// Merge and deduplicate search results
437    fn merge_search_results(
438        &self,
439        results: Vec<(String, f32)>,
440        limit: usize,
441    ) -> Vec<(String, f32)> {
442        let mut result_map: HashMap<String, f32> = HashMap::new();
443
444        // Aggregate scores for duplicate IDs (take maximum score)
445        for (id, score) in results {
446            result_map
447                .entry(id)
448                .and_modify(|existing_score| *existing_score = existing_score.max(score))
449                .or_insert(score);
450        }
451
452        // Convert to vector and sort by score
453        let mut merged: Vec<(String, f32)> = result_map.into_iter().collect();
454        merged.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
455
456        // Apply limit
457        merged.truncate(limit);
458        merged
459    }
460
461    /// Get cached result
462    fn get_cached_result(&self, cache_key: &str) -> Option<VectorQueryResult> {
463        self.query_cache.get(cache_key).cloned()
464    }
465
466    /// Cache query result
467    fn cache_result(&mut self, cache_key: String, result: VectorQueryResult) {
468        // Simple cache with fixed size (in real implementation, use LRU or similar)
469        if self.query_cache.len() >= 1000 {
470            // Remove oldest entry (simplified)
471            if let Some(first_key) = self.query_cache.keys().next().cloned() {
472                self.query_cache.remove(&first_key);
473            }
474        }
475        self.query_cache.insert(cache_key, result);
476
477        // Update cache statistics
478        if let Some(ref monitor) = self.performance_monitor {
479            monitor.update_cache_size(self.query_cache.len(), 1000);
480        }
481    }
482
483    /// Clear query cache
484    pub fn clear_cache(&mut self) {
485        self.query_cache.clear();
486        if let Some(ref monitor) = self.performance_monitor {
487            monitor.update_cache_size(0, 1000);
488        }
489    }
490
491    /// Get cache statistics
492    pub fn cache_stats(&self) -> (usize, usize) {
493        (self.query_cache.len(), 1000)
494    }
495
496    /// Add a resource embedding to the vector store
497    pub fn add_resource_embedding(&mut self, uri: &str, content: &EmbeddableContent) -> Result<()> {
498        // Generate embedding for the content
499        let vector = self.embedding_manager.get_embedding(content)?;
500
501        // Insert the vector into the store with the URI as the key
502        self.vector_store.index.insert(uri.to_string(), vector)?;
503
504        Ok(())
505    }
506}
507
508/// Simple string hashing function
509fn hash_string(s: &str) -> u64 {
510    use std::collections::hash_map::DefaultHasher;
511    use std::hash::{Hash, Hasher};
512
513    let mut hasher = DefaultHasher::new();
514    s.hash(&mut hasher);
515    hasher.finish()
516}
517
518#[cfg(test)]
519mod tests {
520    use super::*;
521    use crate::embeddings::EmbeddingStrategy;
522
523    #[test]
524    fn test_query_optimization() {
525        let vector_store = VectorStore::new();
526        let embedding_manager = EmbeddingManager::new(EmbeddingStrategy::TfIdf, 100).unwrap();
527        let optimizer = VectorQueryOptimizer::default();
528
529        let executor = QueryExecutor::new(vector_store, embedding_manager, optimizer, None, None);
530
531        let query = VectorQuery::new(
532            "similarity_search".to_string(),
533            vec![
534                VectorServiceArg::IRI("http://example.org/resource1".to_string()),
535                VectorServiceArg::IRI("http://example.org/resource2".to_string()),
536            ],
537        );
538
539        let optimized = executor.optimize_query(&query).unwrap();
540        assert!(optimized.use_cache);
541    }
542
543    #[test]
544    fn test_cache_key_generation() {
545        let query1 = VectorQuery::new(
546            "search".to_string(),
547            vec![VectorServiceArg::String("test".to_string())],
548        );
549
550        let query2 = VectorQuery::new(
551            "search".to_string(),
552            vec![VectorServiceArg::String("test".to_string())],
553        );
554
555        assert_eq!(query1.cache_key(), query2.cache_key());
556    }
557
558    #[test]
559    fn test_merge_search_results() {
560        let vector_store = VectorStore::new();
561        let embedding_manager = EmbeddingManager::new(EmbeddingStrategy::TfIdf, 100).unwrap();
562        let optimizer = VectorQueryOptimizer::default();
563
564        let executor = QueryExecutor::new(vector_store, embedding_manager, optimizer, None, None);
565
566        let results = vec![
567            ("doc1".to_string(), 0.8),
568            ("doc2".to_string(), 0.9),
569            ("doc1".to_string(), 0.7), // Duplicate with lower score
570            ("doc3".to_string(), 0.6),
571        ];
572
573        let merged = executor.merge_search_results(results, 10);
574
575        assert_eq!(merged.len(), 3);
576        assert_eq!(merged[0].0, "doc2"); // Highest score first
577        assert_eq!(merged[1].1, 0.8); // doc1 should have max score of 0.8
578    }
579}