Skip to main content

oxirs_vec/sparql_integration/
query_executor.rs

1//! Core query execution and optimization for SPARQL vector operations
2
3use super::config::{VectorQuery, VectorQueryOptimizer, VectorQueryResult, VectorServiceArg};
4use super::cross_language::CrossLanguageProcessor;
5use super::monitoring::PerformanceMonitor;
6use crate::{
7    clustering::{ClusteringAlgorithm, ClusteringConfig, ClusteringEngine},
8    embeddings::{EmbeddableContent, EmbeddingManager},
9    graph_aware_search::{GraphAwareSearch, GraphContext, GraphSearchScope},
10    VectorStore,
11};
12use anyhow::{anyhow, Result};
13use std::collections::HashMap;
14use std::time::Instant;
15
16/// Core query execution engine for vector operations
17pub struct QueryExecutor {
18    vector_store: VectorStore,
19    embedding_manager: EmbeddingManager,
20    query_cache: HashMap<String, VectorQueryResult>,
21    optimizer: VectorQueryOptimizer,
22    performance_monitor: Option<PerformanceMonitor>,
23    cross_language_processor: CrossLanguageProcessor,
24    graph_aware_search: Option<GraphAwareSearch>,
25}
26
27impl QueryExecutor {
28    pub fn new(
29        vector_store: VectorStore,
30        embedding_manager: EmbeddingManager,
31        optimizer: VectorQueryOptimizer,
32        performance_monitor: Option<PerformanceMonitor>,
33        graph_aware_search: Option<GraphAwareSearch>,
34    ) -> Self {
35        Self {
36            vector_store,
37            embedding_manager,
38            query_cache: HashMap::new(),
39            optimizer,
40            performance_monitor,
41            cross_language_processor: CrossLanguageProcessor::new(),
42            graph_aware_search,
43        }
44    }
45
46    /// Execute query with performance monitoring and optimization
47    pub fn execute_optimized_query(&mut self, query: &VectorQuery) -> Result<VectorQueryResult> {
48        let start_time = Instant::now();
49
50        // Apply query optimization if enabled
51        let optimized_query = if self.optimizer.enable_index_selection {
52            self.optimize_query(query)?
53        } else {
54            query.clone()
55        };
56
57        // Execute the query
58        let result = self.execute_query_internal(&optimized_query);
59
60        // Record performance metrics
61        let duration = start_time.elapsed();
62        if let Some(ref monitor) = self.performance_monitor {
63            monitor.record_query(duration, result.is_ok());
64            monitor.record_operation(&format!("query_{}", query.operation_type), duration);
65        }
66
67        result
68    }
69
70    /// Optimize query for better performance
71    fn optimize_query(&self, query: &VectorQuery) -> Result<VectorQuery> {
72        let mut optimized = query.clone();
73
74        // Index selection optimization
75        if self.optimizer.enable_index_selection {
76            optimized.preferred_index = self.select_optimal_index(query)?;
77        }
78
79        // Caching optimization
80        if self.optimizer.enable_caching {
81            optimized.use_cache = true;
82        }
83
84        // Parallel execution optimization
85        if self.optimizer.enable_parallel_execution && query.can_parallelize() {
86            optimized.parallel_execution = true;
87        }
88
89        Ok(optimized)
90    }
91
92    /// Select optimal index for query execution
93    fn select_optimal_index(&self, query: &VectorQuery) -> Result<Option<String>> {
94        match query.operation_type.as_str() {
95            "similarity_search" => {
96                // For similarity search, index is usually better for large datasets
97                if query.estimated_result_size.unwrap_or(0) > 1000 {
98                    Ok(Some("hnsw".to_string()))
99                } else {
100                    Ok(Some("memory".to_string()))
101                }
102            }
103            "threshold_search" => {
104                // Threshold search benefits from approximate indices
105                Ok(Some("lsh".to_string()))
106            }
107            _ => Ok(None),
108        }
109    }
110
111    /// Execute query with internal optimizations
112    fn execute_query_internal(&mut self, query: &VectorQuery) -> Result<VectorQueryResult> {
113        // Check cache first if enabled
114        if query.use_cache {
115            if let Some(cached_result) = self.get_cached_result(&query.cache_key()) {
116                if let Some(ref monitor) = self.performance_monitor {
117                    monitor.record_cache_hit();
118                }
119                return Ok(cached_result.from_cache());
120            } else if let Some(ref monitor) = self.performance_monitor {
121                monitor.record_cache_miss();
122            }
123        }
124
125        let start_time = Instant::now();
126        let result = match query.operation_type.as_str() {
127            "similarity" => self.execute_similarity_query(query),
128            "similar" => self.execute_similar_query(query),
129            "search" | "search_text" => self.execute_search_query(query),
130            "searchIn" => self.execute_search_in_query(query),
131            "cluster" => self.execute_cluster_query(query),
132            "embed" | "embed_text" => self.execute_embed_query(query),
133            _ => Err(anyhow!("Unknown operation type: {}", query.operation_type)),
134        }?;
135
136        let execution_time = start_time.elapsed();
137        let query_result = VectorQueryResult::new(result, execution_time);
138
139        // Cache the result if caching is enabled
140        if query.use_cache {
141            self.cache_result(query.cache_key(), query_result.clone());
142        }
143
144        Ok(query_result)
145    }
146
147    /// Execute similarity query between two resources
148    fn execute_similarity_query(&mut self, query: &VectorQuery) -> Result<Vec<(String, f32)>> {
149        if query.args.len() < 2 {
150            return Err(anyhow!("Similarity query requires at least 2 arguments"));
151        }
152
153        let resource1 = match &query.args[0] {
154            VectorServiceArg::IRI(iri) => iri,
155            _ => return Err(anyhow!("First argument must be an IRI")),
156        };
157
158        let resource2 = match &query.args[1] {
159            VectorServiceArg::IRI(iri) => iri,
160            _ => return Err(anyhow!("Second argument must be an IRI")),
161        };
162
163        // Get vectors for both resources
164        let vector1 = self
165            .vector_store
166            .get_vector(&resource1.clone())
167            .ok_or_else(|| anyhow!("Vector not found for resource: {}", resource1))?
168            .clone();
169        let vector2 = self
170            .vector_store
171            .get_vector(&resource2.clone())
172            .ok_or_else(|| anyhow!("Vector not found for resource: {}", resource2))?
173            .clone();
174
175        // Calculate similarity
176        let similarity =
177            crate::similarity::cosine_similarity(&vector1.as_slice(), &vector2.as_slice());
178
179        Ok(vec![(format!("{resource1}-{resource2}"), similarity)])
180    }
181
182    /// Execute similar query to find similar resources
183    fn execute_similar_query(&mut self, query: &VectorQuery) -> Result<Vec<(String, f32)>> {
184        if query.args.is_empty() {
185            return Err(anyhow!("Similar query requires at least 1 argument"));
186        }
187
188        let resource = match &query.args[0] {
189            VectorServiceArg::IRI(iri) => iri,
190            _ => return Err(anyhow!("First argument must be an IRI")),
191        };
192
193        let limit = if query.args.len() > 1 {
194            match &query.args[1] {
195                VectorServiceArg::Number(n) => *n as usize,
196                _ => 10,
197            }
198        } else {
199            10
200        };
201
202        let _threshold = if query.args.len() > 2 {
203            match &query.args[2] {
204                VectorServiceArg::Number(n) => *n,
205                _ => 0.0,
206            }
207        } else {
208            0.0
209        };
210
211        // Get vector for the resource
212        let query_vector = self
213            .vector_store
214            .get_vector(&resource.clone())
215            .ok_or_else(|| anyhow!("Vector not found for resource: {}", resource))?
216            .clone();
217
218        // Perform similarity search
219        let results = self
220            .vector_store
221            .similarity_search_vector(&query_vector, limit)?;
222
223        Ok(results
224            .into_iter()
225            .filter(|(id, _)| id != resource) // Exclude the query resource itself
226            .collect())
227    }
228
229    /// Execute text search query
230    fn execute_search_query(&mut self, query: &VectorQuery) -> Result<Vec<(String, f32)>> {
231        if query.args.is_empty() {
232            return Err(anyhow!("Search query requires at least 1 argument"));
233        }
234
235        let query_text = match &query.args[0] {
236            VectorServiceArg::String(text) | VectorServiceArg::Literal(text) => text,
237            _ => return Err(anyhow!("First argument must be text")),
238        };
239
240        let limit = if query.args.len() > 1 {
241            match &query.args[1] {
242                VectorServiceArg::Number(n) => *n as usize,
243                _ => 10,
244            }
245        } else {
246            10
247        };
248
249        let threshold = if query.args.len() > 2 {
250            match &query.args[2] {
251                VectorServiceArg::Number(n) => *n,
252                _ => 0.7,
253            }
254        } else {
255            0.7
256        };
257
258        // Check for cross-language search parameters
259        let cross_language = if query.args.len() > 4 {
260            match &query.args[4] {
261                VectorServiceArg::String(val) => val == "true",
262                _ => false,
263            }
264        } else {
265            false
266        };
267
268        let target_languages = if query.args.len() > 5 {
269            match &query.args[5] {
270                VectorServiceArg::String(langs) => langs
271                    .split(',')
272                    .map(|s| s.trim().to_string())
273                    .collect::<Vec<_>>(),
274                _ => vec!["en".to_string()],
275            }
276        } else {
277            vec!["en".to_string()]
278        };
279
280        if cross_language {
281            self.execute_cross_language_search(query_text, limit, threshold, &target_languages)
282        } else {
283            self.execute_simple_text_search(query_text, limit, threshold)
284        }
285    }
286
287    /// Execute simple text search
288    fn execute_simple_text_search(
289        &mut self,
290        query_text: &str,
291        limit: usize,
292        _threshold: f32,
293    ) -> Result<Vec<(String, f32)>> {
294        // Generate embedding for the query text
295        let content = EmbeddableContent::Text(query_text.to_string());
296
297        let query_vector = self.embedding_manager.get_embedding(&content)?;
298
299        // Perform similarity search
300        self.vector_store
301            .similarity_search_vector(&query_vector, limit)
302    }
303
304    /// Execute cross-language search
305    fn execute_cross_language_search(
306        &mut self,
307        query_text: &str,
308        limit: usize,
309        _threshold: f32,
310        target_languages: &[String],
311    ) -> Result<Vec<(String, f32)>> {
312        // Process query with cross-language variations
313        let query_variations = self
314            .cross_language_processor
315            .process_cross_language_query(query_text, target_languages);
316
317        let mut all_results = Vec::new();
318
319        // Execute search for each query variation
320        for (variation_text, weight) in query_variations {
321            let content = EmbeddableContent::Text(variation_text);
322
323            if let Ok(query_vector) = self.embedding_manager.get_embedding(&content) {
324                if let Ok(results) = self
325                    .vector_store
326                    .similarity_search_vector(&query_vector, limit)
327                {
328                    for (id, score) in results {
329                        all_results.push((id, score * weight));
330                    }
331                }
332            }
333        }
334
335        // Merge and deduplicate results
336        let merged_results = self.merge_search_results(all_results, limit);
337        Ok(merged_results)
338    }
339
340    /// Execute graph-scoped search query
341    fn execute_search_in_query(&mut self, query: &VectorQuery) -> Result<Vec<(String, f32)>> {
342        if query.args.len() < 2 {
343            return Err(anyhow!("SearchIn query requires at least 2 arguments"));
344        }
345
346        let query_text = match &query.args[0] {
347            VectorServiceArg::String(text) | VectorServiceArg::Literal(text) => text,
348            _ => return Err(anyhow!("First argument must be query text")),
349        };
350
351        let graph_iri = match &query.args[1] {
352            VectorServiceArg::IRI(iri) => iri,
353            _ => return Err(anyhow!("Second argument must be a graph IRI")),
354        };
355
356        let limit = if query.args.len() > 2 {
357            match &query.args[2] {
358                VectorServiceArg::Number(n) => *n as usize,
359                _ => 10,
360            }
361        } else {
362            10
363        };
364
365        let scope_str = if query.args.len() > 3 {
366            match &query.args[3] {
367                VectorServiceArg::String(s) => s.as_str(),
368                _ => "exact",
369            }
370        } else {
371            "exact"
372        };
373
374        let threshold = if query.args.len() > 4 {
375            match &query.args[4] {
376                VectorServiceArg::Number(n) => *n,
377                _ => 0.7,
378            }
379        } else {
380            0.7
381        };
382
383        // Convert scope string to enum
384        let scope = match scope_str {
385            "children" => GraphSearchScope::IncludeChildren,
386            "parents" => GraphSearchScope::IncludeParents,
387            "hierarchy" => GraphSearchScope::FullHierarchy,
388            "related" => GraphSearchScope::Related,
389            _ => GraphSearchScope::Exact,
390        };
391
392        if let Some(ref _graph_search) = self.graph_aware_search {
393            let _context = GraphContext {
394                primary_graph: graph_iri.clone(),
395                additional_graphs: Vec::new(),
396                scope,
397                context_weights: HashMap::new(),
398            };
399
400            // Generate embedding for query text
401            let content = EmbeddableContent::Text(query_text.to_string());
402            let _query_vector = self.embedding_manager.get_embedding(&content)?;
403
404            // Since search_with_context doesn't exist, fallback to simple search
405            self.execute_simple_text_search(query_text, limit, threshold)
406        } else {
407            // Fallback to simple search if graph-aware search is not available
408            self.execute_simple_text_search(query_text, limit, threshold)
409        }
410    }
411
412    /// Execute clustering query.
413    ///
414    /// Argument protocol (all optional, positional):
415    /// - `args[0]` `Number` — number of clusters `k` (default: 3)
416    /// - `args[1]` `String` — algorithm name: `"kmeans"` (default), `"dbscan"`,
417    ///   `"hierarchical"`, `"spectral"`, `"community"`, `"similarity"`
418    /// - `args[2]` `Number` — similarity threshold for DBSCAN / similarity
419    ///   clustering (default: 0.7)
420    ///
421    /// Returns `Vec<(resource_id, cluster_id_as_f32)>` — one entry per member of
422    /// every non-empty cluster found in the store.  Resources that were not
423    /// assigned to any cluster (DBSCAN noise) are omitted.
424    ///
425    /// **Note:** only index implementations that override `iter_vectors()`
426    /// (currently `MemoryVectorIndex`) expose their vectors; other index types
427    /// will return an empty result.
428    fn execute_cluster_query(&self, query: &VectorQuery) -> Result<Vec<(String, f32)>> {
429        // --- parse arguments ------------------------------------------------
430        let num_clusters = if query.args.is_empty() {
431            3usize
432        } else {
433            match &query.args[0] {
434                VectorServiceArg::Number(n) => (*n as usize).max(1),
435                _ => 3,
436            }
437        };
438
439        let algorithm = if query.args.len() > 1 {
440            match &query.args[1] {
441                VectorServiceArg::String(s) | VectorServiceArg::Literal(s) => match s.as_str() {
442                    "dbscan" => ClusteringAlgorithm::DBSCAN,
443                    "hierarchical" => ClusteringAlgorithm::Hierarchical,
444                    "spectral" => ClusteringAlgorithm::Spectral,
445                    "community" => ClusteringAlgorithm::Community,
446                    "similarity" => ClusteringAlgorithm::Similarity,
447                    _ => ClusteringAlgorithm::KMeans,
448                },
449                _ => ClusteringAlgorithm::KMeans,
450            }
451        } else {
452            ClusteringAlgorithm::KMeans
453        };
454
455        let similarity_threshold = if query.args.len() > 2 {
456            match &query.args[2] {
457                VectorServiceArg::Number(n) => *n,
458                _ => 0.7,
459            }
460        } else {
461            0.7
462        };
463
464        // --- retrieve all indexed vectors -----------------------------------
465        let resources: Vec<(String, crate::Vector)> = self.vector_store.iter_vectors();
466
467        if resources.is_empty() {
468            return Ok(Vec::new());
469        }
470
471        // --- run clustering engine ------------------------------------------
472        let config = ClusteringConfig {
473            algorithm,
474            num_clusters: Some(num_clusters),
475            similarity_threshold,
476            ..ClusteringConfig::default()
477        };
478
479        let engine = ClusteringEngine::new(config);
480        let clustering_result = engine.cluster(&resources)?;
481
482        // --- flatten clusters into (resource_id, cluster_id_as_f32) pairs --
483        let mut output: Vec<(String, f32)> = Vec::new();
484        for cluster in &clustering_result.clusters {
485            let cluster_score = cluster.id as f32;
486            for member in &cluster.members {
487                output.push((member.clone(), cluster_score));
488            }
489        }
490
491        Ok(output)
492    }
493
494    /// Execute embedding generation query
495    fn execute_embed_query(&mut self, query: &VectorQuery) -> Result<Vec<(String, f32)>> {
496        if query.args.is_empty() {
497            return Err(anyhow!("Embed query requires at least 1 argument"));
498        }
499
500        let text = match &query.args[0] {
501            VectorServiceArg::String(text) | VectorServiceArg::Literal(text) => text,
502            _ => return Err(anyhow!("First argument must be text")),
503        };
504
505        let content = EmbeddableContent::Text(text.to_string());
506
507        let vector = self.embedding_manager.get_embedding(&content)?;
508
509        // Store the vector with a generated ID
510        let id = format!("embedded_{}", hash_string(text));
511        self.vector_store.index_vector(id.clone(), vector)?;
512
513        Ok(vec![(id, 1.0)])
514    }
515
516    /// Merge and deduplicate search results
517    fn merge_search_results(
518        &self,
519        results: Vec<(String, f32)>,
520        limit: usize,
521    ) -> Vec<(String, f32)> {
522        let mut result_map: HashMap<String, f32> = HashMap::new();
523
524        // Aggregate scores for duplicate IDs (take maximum score)
525        for (id, score) in results {
526            result_map
527                .entry(id)
528                .and_modify(|existing_score| *existing_score = existing_score.max(score))
529                .or_insert(score);
530        }
531
532        // Convert to vector and sort by score
533        let mut merged: Vec<(String, f32)> = result_map.into_iter().collect();
534        merged.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
535
536        // Apply limit
537        merged.truncate(limit);
538        merged
539    }
540
541    /// Get cached result
542    fn get_cached_result(&self, cache_key: &str) -> Option<VectorQueryResult> {
543        self.query_cache.get(cache_key).cloned()
544    }
545
546    /// Cache query result
547    fn cache_result(&mut self, cache_key: String, result: VectorQueryResult) {
548        // Simple cache with fixed size (in real implementation, use LRU or similar)
549        if self.query_cache.len() >= 1000 {
550            // Remove oldest entry (simplified)
551            if let Some(first_key) = self.query_cache.keys().next().cloned() {
552                self.query_cache.remove(&first_key);
553            }
554        }
555        self.query_cache.insert(cache_key, result);
556
557        // Update cache statistics
558        if let Some(ref monitor) = self.performance_monitor {
559            monitor.update_cache_size(self.query_cache.len(), 1000);
560        }
561    }
562
563    /// Clear query cache
564    pub fn clear_cache(&mut self) {
565        self.query_cache.clear();
566        if let Some(ref monitor) = self.performance_monitor {
567            monitor.update_cache_size(0, 1000);
568        }
569    }
570
571    /// Get cache statistics
572    pub fn cache_stats(&self) -> (usize, usize) {
573        (self.query_cache.len(), 1000)
574    }
575
576    /// Add a resource embedding to the vector store
577    pub fn add_resource_embedding(&mut self, uri: &str, content: &EmbeddableContent) -> Result<()> {
578        // Generate embedding for the content
579        let vector = self.embedding_manager.get_embedding(content)?;
580
581        // Insert the vector into the store with the URI as the key
582        self.vector_store.index_vector(uri.to_string(), vector)?;
583
584        Ok(())
585    }
586}
587
588/// Simple string hashing function
589fn hash_string(s: &str) -> u64 {
590    use std::collections::hash_map::DefaultHasher;
591    use std::hash::{Hash, Hasher};
592
593    let mut hasher = DefaultHasher::new();
594    s.hash(&mut hasher);
595    hasher.finish()
596}
597
598#[cfg(test)]
599mod tests {
600    use super::*;
601    use crate::embeddings::EmbeddingStrategy;
602    use anyhow::Result;
603
604    #[test]
605    fn test_query_optimization() -> Result<()> {
606        let vector_store = VectorStore::new();
607        let embedding_manager = EmbeddingManager::new(EmbeddingStrategy::TfIdf, 100)?;
608        let optimizer = VectorQueryOptimizer::default();
609
610        let executor = QueryExecutor::new(vector_store, embedding_manager, optimizer, None, None);
611
612        let query = VectorQuery::new(
613            "similarity_search".to_string(),
614            vec![
615                VectorServiceArg::IRI("http://example.org/resource1".to_string()),
616                VectorServiceArg::IRI("http://example.org/resource2".to_string()),
617            ],
618        );
619
620        let optimized = executor.optimize_query(&query)?;
621        assert!(optimized.use_cache);
622        Ok(())
623    }
624
625    #[test]
626    fn test_cache_key_generation() {
627        let query1 = VectorQuery::new(
628            "search".to_string(),
629            vec![VectorServiceArg::String("test".to_string())],
630        );
631
632        let query2 = VectorQuery::new(
633            "search".to_string(),
634            vec![VectorServiceArg::String("test".to_string())],
635        );
636
637        assert_eq!(query1.cache_key(), query2.cache_key());
638    }
639
640    #[test]
641    fn test_merge_search_results() -> Result<()> {
642        let vector_store = VectorStore::new();
643        let embedding_manager = EmbeddingManager::new(EmbeddingStrategy::TfIdf, 100)?;
644        let optimizer = VectorQueryOptimizer::default();
645
646        let executor = QueryExecutor::new(vector_store, embedding_manager, optimizer, None, None);
647
648        let results = vec![
649            ("doc1".to_string(), 0.8),
650            ("doc2".to_string(), 0.9),
651            ("doc1".to_string(), 0.7), // Duplicate with lower score
652            ("doc3".to_string(), 0.6),
653        ];
654
655        let merged = executor.merge_search_results(results, 10);
656
657        assert_eq!(merged.len(), 3);
658        assert_eq!(merged[0].0, "doc2"); // Highest score first
659        assert_eq!(merged[1].1, 0.8); // doc1 should have max score of 0.8
660        Ok(())
661    }
662
663    /// Helpers shared by the cluster query tests.
664    mod cluster_test_helpers {
665        use crate::{MemoryVectorIndex, Vector, VectorIndex as _};
666
667        /// Build a `MemoryVectorIndex` with `n_per_cluster` vectors per cluster
668        /// group.  Each cluster group lives in a linearly-separated band of the
669        /// first dimension so that k-means can reliably separate them.
670        pub fn build_clustered_index(
671            n_clusters: usize,
672            n_per_cluster: usize,
673        ) -> Box<dyn crate::VectorIndex> {
674            let mut idx = MemoryVectorIndex::new();
675
676            // Vectors are made well-separated in cosine space: each cluster
677            // group points primarily along a different axis so cosine
678            // k-means can reliably separate them.  We use `n_clusters`
679            // dimensions (one per cluster) and a small noise in the others
680            // to keep every vector non-zero.
681            let dim = n_clusters.max(4); // at least 4 dims
682
683            for cluster in 0..n_clusters {
684                for member in 0..n_per_cluster {
685                    // Primary component: large value along the cluster's axis.
686                    // Noise: tiny values on all other axes so cosine distance
687                    // between clusters is close to 1 (orthogonal).
688                    let mut values = vec![0.001f32; dim];
689                    values[cluster] = 10.0 + (member as f32) * 0.01;
690                    let id = format!("cluster{cluster}_member{member}");
691                    idx.insert(id, Vector::new(values)).expect("insert ok");
692                }
693            }
694
695            Box::new(idx)
696        }
697    }
698
699    /// Happy path: 9 vectors in 3 clear groups → all 9 appear in the output
700    /// and exactly 3 distinct cluster-id values are present.
701    #[test]
702    fn test_cluster_query_happy_path() -> Result<()> {
703        use cluster_test_helpers::build_clustered_index;
704
705        let n_clusters = 3usize;
706        let n_per_cluster = 3usize;
707        let total = n_clusters * n_per_cluster;
708
709        // Vectors are pre-loaded into the index; no embedding generation needed.
710        let vector_store =
711            VectorStore::with_index(build_clustered_index(n_clusters, n_per_cluster));
712        let embedding_manager = EmbeddingManager::new(EmbeddingStrategy::TfIdf, 100)?;
713        let optimizer = VectorQueryOptimizer::default();
714        let executor = QueryExecutor::new(vector_store, embedding_manager, optimizer, None, None);
715
716        let query = VectorQuery::new(
717            "cluster".to_string(),
718            vec![
719                VectorServiceArg::Number(n_clusters as f32), // k
720                VectorServiceArg::String("kmeans".to_string()),
721            ],
722        );
723
724        let results = executor.execute_cluster_query(&query)?;
725
726        // Every vector must appear exactly once.
727        assert_eq!(results.len(), total, "all members must be returned");
728
729        // There should be exactly `n_clusters` distinct cluster ids.
730        let mut cluster_ids: Vec<u32> = results
731            .iter()
732            .map(|(_, cid)| *cid as u32)
733            .collect::<std::collections::HashSet<_>>()
734            .into_iter()
735            .collect();
736        cluster_ids.sort();
737        assert_eq!(
738            cluster_ids.len(),
739            n_clusters,
740            "expected {n_clusters} distinct cluster ids, got {:?}",
741            cluster_ids
742        );
743
744        Ok(())
745    }
746
747    /// Empty store: cluster query must return an empty vec, not an error.
748    #[test]
749    fn test_cluster_query_empty_store() -> Result<()> {
750        let vector_store = VectorStore::new(); // no vectors
751        let embedding_manager = EmbeddingManager::new(EmbeddingStrategy::TfIdf, 100)?;
752        let optimizer = VectorQueryOptimizer::default();
753        let executor = QueryExecutor::new(vector_store, embedding_manager, optimizer, None, None);
754
755        let query = VectorQuery::new("cluster".to_string(), vec![VectorServiceArg::Number(3.0)]);
756
757        let results = executor.execute_cluster_query(&query)?;
758        assert!(
759            results.is_empty(),
760            "empty store must yield empty cluster result"
761        );
762        Ok(())
763    }
764
765    /// Invalid k (k >= n): the `ClusteringEngine` must return an error.
766    #[test]
767    fn test_cluster_query_invalid_k() -> Result<()> {
768        use cluster_test_helpers::build_clustered_index;
769
770        // 2 vectors, k=5 → k >= n → engine error expected.
771        let n_clusters = 1usize;
772        let n_per_cluster = 2usize;
773
774        let vector_store =
775            VectorStore::with_index(build_clustered_index(n_clusters, n_per_cluster));
776        let embedding_manager = EmbeddingManager::new(EmbeddingStrategy::TfIdf, 100)?;
777        let optimizer = VectorQueryOptimizer::default();
778        let executor = QueryExecutor::new(vector_store, embedding_manager, optimizer, None, None);
779
780        let query = VectorQuery::new(
781            "cluster".to_string(),
782            vec![
783                VectorServiceArg::Number(5.0), // k=5, but only 2 vectors
784                VectorServiceArg::String("kmeans".to_string()),
785            ],
786        );
787
788        let result = executor.execute_cluster_query(&query);
789        assert!(
790            result.is_err(),
791            "k >= n should produce an error from the clustering engine"
792        );
793        Ok(())
794    }
795}