use std::collections::{HashMap, HashSet};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use anyhow::{anyhow, Result};
use tracing::{info, span, Level};
use crate::algebra::{Algebra, Expression, Term, TriplePattern, Variable};
use crate::integrated_query_planner::{
IntegratedExecutionPlan, IntegratedPlannerConfig, IntegratedQueryPlanner,
};
#[derive(Debug, Clone)]
pub struct VectorOptimizerConfig {
pub enable_vector_optimization: bool,
pub similarity_threshold: f32,
pub max_vector_candidates: usize,
pub vector_cache_size: usize,
pub enable_hybrid_search: bool,
pub embedding_dimension: usize,
pub distance_metric: VectorDistanceMetric,
pub preferred_index_types: Vec<VectorIndexType>,
pub complexity_threshold: f64,
}
impl Default for VectorOptimizerConfig {
fn default() -> Self {
Self {
enable_vector_optimization: true,
similarity_threshold: 0.8,
max_vector_candidates: 1000,
vector_cache_size: 10_000,
enable_hybrid_search: true,
embedding_dimension: 768, distance_metric: VectorDistanceMetric::Cosine,
preferred_index_types: vec![
VectorIndexType::Hnsw,
VectorIndexType::IvfFlat,
VectorIndexType::IvfPq,
],
complexity_threshold: 10.0,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum VectorDistanceMetric {
Cosine,
Euclidean,
DotProduct,
Manhattan,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum VectorIndexType {
Hnsw,
IvfFlat,
IvfPq,
FlatIndex,
Lsh,
}
#[derive(Debug, Clone)]
pub enum VectorSearchStrategy {
PureVector {
query_vector: Vec<f32>,
similarity_threshold: f32,
k: usize,
},
Hybrid {
text_query: String,
query_vector: Option<Vec<f32>>,
text_weight: f32,
vector_weight: f32,
},
VectorConstrained {
sparql_patterns: Vec<TriplePattern>,
vector_filter: VectorFilter,
},
SemanticExpansion {
original_terms: Vec<Term>,
expansion_candidates: Vec<(Term, f32)>,
max_expansions: usize,
},
}
#[derive(Debug, Clone)]
pub struct VectorFilter {
pub subject_vector: Option<Vec<f32>>,
pub object_vector: Option<Vec<f32>>,
pub predicate_vector: Option<Vec<f32>>,
pub similarity_threshold: f32,
pub max_matches: usize,
}
pub struct VectorQueryOptimizer {
config: VectorOptimizerConfig,
integrated_planner: IntegratedQueryPlanner,
vector_indexes: Arc<Mutex<HashMap<String, VectorIndexInfo>>>,
#[allow(dead_code)]
embedding_cache: Arc<Mutex<HashMap<String, Vec<f32>>>>,
#[allow(dead_code)]
#[allow(clippy::type_complexity)]
semantic_cache: Arc<Mutex<HashMap<String, Vec<(String, f32)>>>>,
#[allow(dead_code)]
query_patterns: Arc<Mutex<HashMap<u64, VectorSearchStrategy>>>,
performance_metrics: Arc<Mutex<VectorPerformanceMetrics>>,
}
#[derive(Debug, Clone)]
pub struct VectorIndexInfo {
pub index_type: VectorIndexType,
pub dimension: usize,
pub size: usize,
pub distance_metric: VectorDistanceMetric,
pub build_time: Duration,
pub last_updated: Instant,
pub accuracy_stats: IndexAccuracyStats,
pub performance_stats: IndexPerformanceStats,
}
#[derive(Debug, Clone, Default)]
pub struct IndexAccuracyStats {
pub recall_at_k: HashMap<usize, f32>,
pub precision_at_k: HashMap<usize, f32>,
pub average_distance_error: f32,
pub query_count: usize,
}
#[derive(Debug, Clone, Default)]
pub struct IndexPerformanceStats {
pub average_query_time: Duration,
pub queries_per_second: f32,
pub memory_usage: usize,
pub cache_hit_rate: f32,
pub index_efficiency: f32,
}
#[derive(Debug, Clone, Default)]
pub struct VectorPerformanceMetrics {
pub vector_queries_optimized: usize,
pub hybrid_queries_optimized: usize,
pub semantic_expansions_performed: usize,
pub average_optimization_speedup: f32,
pub vector_cache_hit_rate: f32,
pub embedding_generation_time: Duration,
pub total_optimization_time: Duration,
}
#[derive(Debug, Clone)]
pub struct VectorEnhancedPlan {
pub base_plan: IntegratedExecutionPlan,
pub vector_strategy: Option<VectorSearchStrategy>,
pub recommended_vector_index: Option<String>,
pub vector_performance_estimate: VectorPerformanceEstimate,
pub hybrid_config: Option<HybridSearchConfig>,
}
#[derive(Debug, Clone, Default)]
pub struct VectorPerformanceEstimate {
pub estimated_query_time: Duration,
pub estimated_recall: f32,
pub estimated_memory_usage: usize,
pub confidence: f32,
}
#[derive(Debug, Clone)]
pub struct HybridSearchConfig {
pub text_weight: f32,
pub vector_weight: f32,
pub reranking_k: usize,
pub fusion_method: ResultFusionMethod,
}
#[derive(Debug, Clone, Copy)]
pub enum ResultFusionMethod {
LinearCombination,
RankFusion,
BayesianFusion,
LearningToRank,
}
impl VectorQueryOptimizer {
pub fn new(
vector_config: VectorOptimizerConfig,
planner_config: IntegratedPlannerConfig,
) -> Result<Self> {
let integrated_planner = IntegratedQueryPlanner::new(planner_config)?;
Ok(Self {
config: vector_config,
integrated_planner,
vector_indexes: Arc::new(Mutex::new(HashMap::new())),
embedding_cache: Arc::new(Mutex::new(HashMap::new())),
semantic_cache: Arc::new(Mutex::new(HashMap::new())),
query_patterns: Arc::new(Mutex::new(HashMap::new())),
performance_metrics: Arc::new(Mutex::new(VectorPerformanceMetrics::default())),
})
}
pub fn register_vector_index(&self, name: String, index_info: VectorIndexInfo) -> Result<()> {
let mut indexes = self.vector_indexes.lock().expect("lock poisoned");
let size = index_info.size;
indexes.insert(name.clone(), index_info);
info!("Registered vector index: {} with {} vectors", name, size);
Ok(())
}
pub fn create_vector_enhanced_plan(&mut self, algebra: &Algebra) -> Result<VectorEnhancedPlan> {
let span = span!(Level::DEBUG, "vector_enhanced_planning");
let _enter = span.enter();
let base_plan = self.integrated_planner.create_plan(algebra)?;
let vector_opportunities = self.analyze_vector_opportunities(algebra)?;
if vector_opportunities.is_empty() {
return Ok(VectorEnhancedPlan {
base_plan,
vector_strategy: None,
recommended_vector_index: None,
vector_performance_estimate: VectorPerformanceEstimate::default(),
hybrid_config: None,
});
}
let vector_strategy = self.select_vector_strategy(&vector_opportunities, algebra)?;
let recommended_vector_index = self.select_vector_index(&vector_strategy)?;
let vector_performance_estimate =
self.estimate_vector_performance(&vector_strategy, &recommended_vector_index)?;
let hybrid_config = self.configure_hybrid_search(&vector_strategy)?;
self.update_optimization_metrics(&vector_strategy);
Ok(VectorEnhancedPlan {
base_plan,
vector_strategy: Some(vector_strategy),
recommended_vector_index,
vector_performance_estimate,
hybrid_config,
})
}
fn analyze_vector_opportunities(&self, algebra: &Algebra) -> Result<Vec<VectorOpportunity>> {
let mut opportunities = Vec::new();
match algebra {
Algebra::Bgp(patterns) => {
opportunities.extend(self.analyze_bgp_patterns(patterns)?);
}
Algebra::Filter { pattern, condition } => {
if self.is_semantic_filter(condition) {
opportunities.push(VectorOpportunity::SemanticFilter {
condition: condition.clone(),
estimated_selectivity: 0.1, });
}
opportunities.extend(self.analyze_vector_opportunities(pattern)?);
}
Algebra::Join { left, right } => {
opportunities.extend(self.analyze_vector_opportunities(left)?);
opportunities.extend(self.analyze_vector_opportunities(right)?);
if let Some(join_opportunity) = self.analyze_join_opportunity(left, right)? {
opportunities.push(join_opportunity);
}
}
Algebra::Union { left, right } => {
opportunities.extend(self.analyze_vector_opportunities(left)?);
opportunities.extend(self.analyze_vector_opportunities(right)?);
}
Algebra::LeftJoin {
left,
right,
filter: _,
} => {
opportunities.extend(self.analyze_vector_opportunities(left)?);
opportunities.extend(self.analyze_vector_opportunities(right)?);
}
_ => {
if let Some(subpattern) = self.extract_subpattern(algebra) {
opportunities.extend(self.analyze_vector_opportunities(&subpattern)?);
}
}
}
Ok(opportunities)
}
fn analyze_bgp_patterns(&self, patterns: &[TriplePattern]) -> Result<Vec<VectorOpportunity>> {
let mut opportunities = Vec::new();
for pattern in patterns {
if self.is_text_matching_pattern(pattern) {
opportunities.push(VectorOpportunity::TextSimilarity {
pattern: pattern.clone(),
estimated_matches: 100, });
}
if self.is_entity_similarity_pattern(pattern) {
opportunities.push(VectorOpportunity::EntitySimilarity {
pattern: pattern.clone(),
similarity_type: EntitySimilarityType::Conceptual,
});
}
if self.is_expandable_property_pattern(pattern) {
opportunities.push(VectorOpportunity::PropertyExpansion {
pattern: pattern.clone(),
expansion_depth: 2,
});
}
}
Ok(opportunities)
}
fn is_text_matching_pattern(&self, pattern: &TriplePattern) -> bool {
match &pattern.object {
Term::Literal(literal) => {
literal.value.len() > 5 && literal.value.chars().any(|c| c.is_alphabetic())
}
_ => false,
}
}
fn is_entity_similarity_pattern(&self, pattern: &TriplePattern) -> bool {
match &pattern.predicate {
Term::Iri(iri) => {
iri.as_str().contains("similar")
|| iri.as_str().contains("related")
|| iri.as_str().contains("type")
|| iri.as_str().contains("category")
}
_ => false,
}
}
fn is_expandable_property_pattern(&self, pattern: &TriplePattern) -> bool {
match &pattern.predicate {
Term::Variable(_) => true, Term::Iri(iri) => {
let expandable_predicates = [
"type",
"category",
"topic",
"subject",
"theme",
"describes",
"about",
"concerns",
"deals_with",
];
expandable_predicates
.iter()
.any(|pred| iri.as_str().contains(pred))
}
_ => false,
}
}
fn is_semantic_filter(&self, expression: &Expression) -> bool {
match expression {
Expression::Function { name, .. } => {
name.as_str().contains("similarity")
|| name.as_str().contains("match")
|| name.as_str().contains("distance")
|| name.as_str().contains("semantic")
}
_ => false,
}
}
fn analyze_join_opportunity(
&self,
left: &Algebra,
right: &Algebra,
) -> Result<Option<VectorOpportunity>> {
let left_vars = self.extract_variables(left);
let right_vars = self.extract_variables(right);
let shared_vars: Vec<_> = left_vars.intersection(&right_vars).collect();
if !shared_vars.is_empty() {
for var in shared_vars {
if self.is_vector_suitable_variable(var, left)
|| self.is_vector_suitable_variable(var, right)
{
return Ok(Some(VectorOpportunity::VectorJoin {
left_pattern: Box::new(left.clone()),
right_pattern: Box::new(right.clone()),
join_variable: var.clone(),
estimated_selectivity: 0.2,
}));
}
}
}
Ok(None)
}
fn extract_variables(&self, algebra: &Algebra) -> HashSet<Variable> {
let mut vars = HashSet::new();
match algebra {
Algebra::Bgp(patterns) => {
for pattern in patterns {
if let Term::Variable(var) = &pattern.subject {
vars.insert(var.clone());
}
if let Term::Variable(var) = &pattern.predicate {
vars.insert(var.clone());
}
if let Term::Variable(var) = &pattern.object {
vars.insert(var.clone());
}
}
}
_ => {
}
}
vars
}
fn is_vector_suitable_variable(&self, _var: &Variable, _context: &Algebra) -> bool {
true }
fn extract_subpattern(&self, algebra: &Algebra) -> Option<Algebra> {
match algebra {
Algebra::Project { pattern, .. } => Some((**pattern).clone()),
Algebra::Distinct { pattern } => Some((**pattern).clone()),
Algebra::Reduced { pattern } => Some((**pattern).clone()),
Algebra::OrderBy { pattern, .. } => Some((**pattern).clone()),
Algebra::Slice { pattern, .. } => Some((**pattern).clone()),
Algebra::Group { pattern, .. } => Some((**pattern).clone()),
Algebra::Having { pattern, .. } => Some((**pattern).clone()),
_ => None,
}
}
fn select_vector_strategy(
&self,
opportunities: &[VectorOpportunity],
_algebra: &Algebra,
) -> Result<VectorSearchStrategy> {
if opportunities.is_empty() {
return Err(anyhow!("No vector opportunities available"));
}
let primary_opportunity = &opportunities[0];
match primary_opportunity {
VectorOpportunity::TextSimilarity { pattern, .. } => {
Ok(VectorSearchStrategy::Hybrid {
text_query: self.extract_text_from_pattern(pattern)?,
query_vector: None, text_weight: 0.6,
vector_weight: 0.4,
})
}
VectorOpportunity::EntitySimilarity { pattern, .. } => {
Ok(VectorSearchStrategy::SemanticExpansion {
original_terms: vec![pattern.subject.clone()],
expansion_candidates: Vec::new(), max_expansions: 10,
})
}
VectorOpportunity::VectorJoin { .. } => {
Ok(VectorSearchStrategy::VectorConstrained {
sparql_patterns: vec![], vector_filter: VectorFilter {
subject_vector: None,
object_vector: None,
predicate_vector: None,
similarity_threshold: self.config.similarity_threshold,
max_matches: self.config.max_vector_candidates,
},
})
}
_ => {
Ok(VectorSearchStrategy::PureVector {
query_vector: Vec::new(), similarity_threshold: self.config.similarity_threshold,
k: 100,
})
}
}
}
fn extract_text_from_pattern(&self, pattern: &TriplePattern) -> Result<String> {
match &pattern.object {
Term::Literal(literal) => Ok(literal.value.clone()),
Term::Iri(iri) => {
let iri_str = iri.as_str();
if let Some(fragment) = iri_str.split('#').next_back() {
Ok(fragment.to_string())
} else if let Some(local) = iri_str.split('/').next_back() {
Ok(local.to_string())
} else {
Ok(iri_str.to_string())
}
}
_ => Err(anyhow!("Cannot extract text from pattern")),
}
}
fn select_vector_index(&self, strategy: &VectorSearchStrategy) -> Result<Option<String>> {
let indexes = self.vector_indexes.lock().expect("lock poisoned");
if indexes.is_empty() {
return Ok(None);
}
let mut best_index = None;
let mut best_score = 0.0f32;
for (name, info) in indexes.iter() {
let score = self.calculate_index_score(info, strategy);
if score > best_score {
best_score = score;
best_index = Some(name.clone());
}
}
Ok(best_index)
}
fn calculate_index_score(
&self,
info: &VectorIndexInfo,
strategy: &VectorSearchStrategy,
) -> f32 {
let mut score = 0.0f32;
let type_bonus = match info.index_type {
VectorIndexType::Hnsw => 1.0,
VectorIndexType::IvfPq => 0.8,
VectorIndexType::IvfFlat => 0.7,
VectorIndexType::FlatIndex => 0.5,
VectorIndexType::Lsh => 0.6,
};
score += type_bonus;
score += info.performance_stats.queries_per_second / 1000.0; score += info.performance_stats.cache_hit_rate;
score += info.performance_stats.index_efficiency;
if let Some(recall_10) = info.accuracy_stats.recall_at_k.get(&10) {
score += recall_10;
}
match strategy {
VectorSearchStrategy::PureVector { k, .. }
if *k <= 100 && matches!(info.index_type, VectorIndexType::Hnsw) => {
score += 0.2;
}
VectorSearchStrategy::Hybrid { .. } => {
score += 0.1;
}
_ => {}
}
score
}
fn estimate_vector_performance(
&self,
strategy: &VectorSearchStrategy,
index_name: &Option<String>,
) -> Result<VectorPerformanceEstimate> {
let mut estimate = VectorPerformanceEstimate::default();
if let Some(name) = index_name {
let indexes = self.vector_indexes.lock().expect("lock poisoned");
if let Some(info) = indexes.get(name) {
estimate.estimated_query_time = info.performance_stats.average_query_time;
estimate.estimated_memory_usage = info.performance_stats.memory_usage;
estimate.estimated_recall = match strategy {
VectorSearchStrategy::PureVector { .. } => {
*info.accuracy_stats.recall_at_k.get(&10).unwrap_or(&0.9)
}
VectorSearchStrategy::Hybrid { .. } => {
info.accuracy_stats.recall_at_k.get(&10).unwrap_or(&0.9) * 1.1
}
_ => 0.8, };
estimate.confidence = 0.8; }
} else {
estimate.estimated_query_time = Duration::from_millis(100);
estimate.estimated_recall = 0.7;
estimate.estimated_memory_usage = 1024 * 1024; estimate.confidence = 0.5;
}
Ok(estimate)
}
fn configure_hybrid_search(
&self,
strategy: &VectorSearchStrategy,
) -> Result<Option<HybridSearchConfig>> {
match strategy {
VectorSearchStrategy::Hybrid {
text_weight,
vector_weight,
..
} => Ok(Some(HybridSearchConfig {
text_weight: *text_weight,
vector_weight: *vector_weight,
reranking_k: 100,
fusion_method: ResultFusionMethod::LinearCombination,
})),
_ => Ok(None),
}
}
fn update_optimization_metrics(&self, strategy: &VectorSearchStrategy) {
let mut metrics = self.performance_metrics.lock().expect("lock poisoned");
match strategy {
VectorSearchStrategy::PureVector { .. } => {
metrics.vector_queries_optimized += 1;
}
VectorSearchStrategy::Hybrid { .. } => {
metrics.hybrid_queries_optimized += 1;
}
VectorSearchStrategy::SemanticExpansion { .. } => {
metrics.semantic_expansions_performed += 1;
}
_ => {}
}
}
pub fn get_performance_metrics(&self) -> VectorPerformanceMetrics {
self.performance_metrics
.lock()
.expect("lock poisoned")
.clone()
}
pub fn update_vector_execution_feedback(
&mut self,
_strategy_hash: u64,
actual_duration: Duration,
_actual_recall: f32,
_actual_memory: usize,
success: bool,
) -> Result<()> {
let mut metrics = self.performance_metrics.lock().expect("lock poisoned");
if success {
let base_time = Duration::from_millis(500); let speedup = base_time.as_millis() as f32 / actual_duration.as_millis() as f32;
let total_optimizations =
metrics.vector_queries_optimized + metrics.hybrid_queries_optimized;
if total_optimizations > 0 {
metrics.average_optimization_speedup = (metrics.average_optimization_speedup
* (total_optimizations - 1) as f32
+ speedup)
/ total_optimizations as f32;
}
}
Ok(())
}
}
#[derive(Debug, Clone)]
pub enum VectorOpportunity {
TextSimilarity {
pattern: TriplePattern,
estimated_matches: usize,
},
EntitySimilarity {
pattern: TriplePattern,
similarity_type: EntitySimilarityType,
},
PropertyExpansion {
pattern: TriplePattern,
expansion_depth: usize,
},
SemanticFilter {
condition: Expression,
estimated_selectivity: f32,
},
VectorJoin {
left_pattern: Box<Algebra>,
right_pattern: Box<Algebra>,
join_variable: Variable,
estimated_selectivity: f32,
},
}
#[derive(Debug, Clone, Copy)]
pub enum EntitySimilarityType {
Conceptual,
Taxonomic,
Relational,
Contextual,
}
#[derive(Debug, Clone)]
pub struct VectorIndexRecommendation {
pub recommended_type: VectorIndexType,
pub estimated_benefit: f32,
pub creation_cost_estimate: Duration,
pub memory_requirement: usize,
pub maintenance_overhead: f32,
}