use crate::ast::ast::{MatchClause, Query};
use crate::plan::logical::LogicalPlan;
use crate::plan::pattern_optimization::{
cost_estimation::StatisticsManager,
logical_integration::{
LogicalPatternOptimizer, OptimizationContext, PatternOptimizationResult,
},
physical_generation::{OptimizationImprovement, PhysicalGenerationConfig},
};
use crate::plan::physical::PhysicalNode;
#[derive(Debug)]
pub struct PatternOptimizationPipeline {
logical_optimizer: LogicalPatternOptimizer,
#[allow(dead_code)]
statistics_manager: StatisticsManager,
config: IntegrationConfig,
metrics: OptimizationMetrics,
}
#[derive(Debug, Clone)]
pub struct IntegrationConfig {
pub enable_optimization: bool,
pub enable_logging: bool,
pub fallback_on_error: bool,
pub min_improvement_threshold: f64,
}
impl Default for IntegrationConfig {
fn default() -> Self {
Self {
enable_optimization: true,
enable_logging: false,
fallback_on_error: true,
min_improvement_threshold: 10.0, }
}
}
#[derive(Debug, Clone, Default)]
pub struct OptimizationMetrics {
pub queries_processed: u64,
pub queries_optimized: u64,
pub total_optimization_time_ms: u64,
pub avg_optimization_time_ms: f64,
pub success_rate: f64,
pub total_cost_improvement: f64,
pub total_cardinality_reduction: f64,
}
impl OptimizationMetrics {
pub fn record_success(
&mut self,
optimization_time_ms: u64,
improvement: &OptimizationImprovement,
) {
self.queries_processed += 1;
self.queries_optimized += 1;
self.total_optimization_time_ms += optimization_time_ms;
self.avg_optimization_time_ms =
self.total_optimization_time_ms as f64 / self.queries_processed as f64;
self.success_rate = (self.queries_optimized as f64) / (self.queries_processed as f64);
self.total_cost_improvement += improvement.cost_reduction_percentage;
self.total_cardinality_reduction += improvement.cardinality_reduction_percentage;
}
pub fn record_skip(&mut self, optimization_time_ms: u64) {
self.queries_processed += 1;
self.total_optimization_time_ms += optimization_time_ms;
self.avg_optimization_time_ms =
self.total_optimization_time_ms as f64 / self.queries_processed as f64;
self.success_rate = (self.queries_optimized as f64) / (self.queries_processed as f64);
}
pub fn avg_cost_improvement(&self) -> f64 {
if self.queries_optimized > 0 {
self.total_cost_improvement / self.queries_optimized as f64
} else {
0.0
}
}
pub fn avg_cardinality_reduction(&self) -> f64 {
if self.queries_optimized > 0 {
self.total_cardinality_reduction / self.queries_optimized as f64
} else {
0.0
}
}
}
impl PatternOptimizationPipeline {
pub fn new() -> Self {
Self {
logical_optimizer: LogicalPatternOptimizer::new(),
statistics_manager: StatisticsManager::new(),
config: IntegrationConfig::default(),
metrics: OptimizationMetrics::default(),
}
}
#[allow(dead_code)] pub fn with_config(config: IntegrationConfig) -> Self {
Self {
logical_optimizer: LogicalPatternOptimizer::new(),
statistics_manager: StatisticsManager::new(),
config,
metrics: OptimizationMetrics::default(),
}
}
pub fn optimize_query(
&mut self,
query: &Query,
_original_logical_plan: &LogicalPlan,
original_physical_plan: &PhysicalNode,
) -> Result<OptimizationPipelineResult, String> {
if !self.config.enable_optimization {
return Ok(OptimizationPipelineResult::no_optimization(
original_physical_plan.clone(),
"Pattern optimization is disabled".to_string(),
));
}
let start_time = std::time::Instant::now();
let match_clauses = self.extract_match_clauses(query)?;
if match_clauses.is_empty() {
let elapsed = start_time.elapsed().as_millis() as u64;
self.metrics.record_skip(elapsed);
return Ok(OptimizationPipelineResult::no_optimization(
original_physical_plan.clone(),
"No MATCH clauses found in query".to_string(),
));
}
let needs_optimization = match_clauses.iter().any(|clause| clause.patterns.len() > 1);
if !needs_optimization {
let elapsed = start_time.elapsed().as_millis() as u64;
self.metrics.record_skip(elapsed);
return Ok(OptimizationPipelineResult::no_optimization(
original_physical_plan.clone(),
"No comma-separated patterns found".to_string(),
));
}
let context = self.create_optimization_context(query)?;
let mut optimized_plan = original_physical_plan.clone();
let mut total_improvement = OptimizationImprovement {
cost_reduction_percentage: 0.0,
cardinality_reduction_percentage: 0.0,
original_cost: 0.0,
optimized_cost: 0.0,
original_rows: 0,
optimized_rows: 0,
};
let mut optimization_applied = false;
let mut optimization_reasons = Vec::new();
for match_clause in &match_clauses {
if match_clause.patterns.len() > 1 {
match self.optimize_single_match_clause(match_clause, &optimized_plan, &context) {
Ok(result) => {
if result.optimization_result.optimized {
optimized_plan = result.optimized_physical_plan;
total_improvement =
self.combine_improvements(&total_improvement, &result.improvement);
optimization_applied = true;
optimization_reasons
.push(result.optimization_result.optimization_reason.clone());
if self.config.enable_logging {
log::debug!(
"✅ Pattern optimization applied: {}",
result.optimization_result.optimization_reason
);
log::debug!(" Improvement: {}", result.improvement.describe());
}
} else {
optimization_reasons
.push(result.optimization_result.optimization_reason.clone());
}
}
Err(e) => {
if self.config.fallback_on_error {
optimization_reasons
.push(format!("Optimization failed, using fallback: {}", e));
} else {
return Err(format!("Pattern optimization failed: {}", e));
}
}
}
}
}
let elapsed = start_time.elapsed().as_millis() as u64;
if optimization_applied
&& total_improvement.is_significant_improvement(self.config.min_improvement_threshold)
{
self.metrics.record_success(elapsed, &total_improvement);
Ok(OptimizationPipelineResult::optimized(
optimized_plan,
total_improvement,
optimization_reasons.join("; "),
))
} else {
self.metrics.record_skip(elapsed);
Ok(OptimizationPipelineResult::no_optimization(
original_physical_plan.clone(),
format!(
"Optimization not beneficial: {}",
optimization_reasons.join("; ")
),
))
}
}
fn optimize_single_match_clause(
&mut self,
match_clause: &MatchClause,
base_physical_plan: &PhysicalNode,
context: &OptimizationContext,
) -> Result<SingleMatchOptimizationResult, String> {
let logical_result =
crate::plan::pattern_optimization::logical_integration::optimize_match_clause_patterns(
&mut self.logical_optimizer,
match_clause,
context,
)?;
let optimized_physical_plan = crate::plan::pattern_optimization::physical_generation::generate_optimized_physical_plan(
&logical_result,
base_physical_plan.clone(),
Some(PhysicalGenerationConfig::default()),
)?;
let improvement = crate::plan::pattern_optimization::physical_generation::estimate_optimization_improvement(
base_physical_plan,
&optimized_physical_plan,
);
Ok(SingleMatchOptimizationResult {
optimization_result: logical_result,
optimized_physical_plan,
improvement,
})
}
fn extract_match_clauses(&self, query: &Query) -> Result<Vec<MatchClause>, String> {
let mut match_clauses = Vec::new();
match query {
Query::Basic(basic_query) => {
match_clauses.push(basic_query.match_clause.clone());
}
Query::SetOperation(set_op) => {
let left_clauses = self.extract_match_clauses(&set_op.left)?;
let right_clauses = self.extract_match_clauses(&set_op.right)?;
match_clauses.extend(left_clauses);
match_clauses.extend(right_clauses);
}
Query::Limited {
query: inner_query, ..
} => {
let inner_clauses = self.extract_match_clauses(inner_query)?;
match_clauses.extend(inner_clauses);
}
Query::WithQuery(_) => {
}
Query::MutationPipeline(_) => {
}
Query::Let(_)
| Query::For(_)
| Query::Filter(_)
| Query::Return(_)
| Query::Unwind(_) => {
}
}
Ok(match_clauses)
}
fn create_optimization_context(&self, _query: &Query) -> Result<OptimizationContext, String> {
Ok(OptimizationContext::default())
}
fn combine_improvements(
&self,
improvement1: &OptimizationImprovement,
improvement2: &OptimizationImprovement,
) -> OptimizationImprovement {
OptimizationImprovement {
cost_reduction_percentage: improvement1.cost_reduction_percentage
+ improvement2.cost_reduction_percentage,
cardinality_reduction_percentage: improvement1.cardinality_reduction_percentage
+ improvement2.cardinality_reduction_percentage,
original_cost: improvement1.original_cost + improvement2.original_cost,
optimized_cost: improvement1.optimized_cost + improvement2.optimized_cost,
original_rows: improvement1.original_rows + improvement2.original_rows,
optimized_rows: improvement1.optimized_rows + improvement2.optimized_rows,
}
}
pub fn get_metrics(&self) -> &OptimizationMetrics {
&self.metrics
}
#[allow(dead_code)] pub fn reset_metrics(&mut self) {
self.metrics = OptimizationMetrics::default();
}
pub fn update_config(&mut self, config: IntegrationConfig) {
self.config = config;
}
#[allow(dead_code)] pub fn record_query_execution(&mut self, query_text: &str, result_count: u64) {
self.statistics_manager
.record_query_execution(query_text, result_count);
}
}
#[derive(Debug, Clone)]
pub struct OptimizationPipelineResult {
pub physical_plan: PhysicalNode,
pub optimized: bool,
pub improvement: OptimizationImprovement,
pub explanation: String,
}
impl OptimizationPipelineResult {
pub fn optimized(
physical_plan: PhysicalNode,
improvement: OptimizationImprovement,
explanation: String,
) -> Self {
Self {
physical_plan,
optimized: true,
improvement,
explanation,
}
}
pub fn no_optimization(physical_plan: PhysicalNode, explanation: String) -> Self {
Self {
physical_plan,
optimized: false,
improvement: OptimizationImprovement {
cost_reduction_percentage: 0.0,
cardinality_reduction_percentage: 0.0,
original_cost: 0.0,
optimized_cost: 0.0,
original_rows: 0,
optimized_rows: 0,
},
explanation,
}
}
}
#[derive(Debug, Clone)]
struct SingleMatchOptimizationResult {
pub optimization_result: PatternOptimizationResult,
pub optimized_physical_plan: PhysicalNode,
pub improvement: OptimizationImprovement,
}
#[allow(dead_code)]
pub struct GlobalPatternOptimizer {
pipeline: PatternOptimizationPipeline,
enabled: bool,
}
impl GlobalPatternOptimizer {
pub fn new() -> Self {
Self {
pipeline: PatternOptimizationPipeline::new(),
enabled: true,
}
}
pub fn set_enabled(&mut self, enabled: bool) {
self.enabled = enabled;
let mut config = IntegrationConfig::default();
config.enable_optimization = enabled;
self.pipeline.update_config(config);
}
pub fn is_enabled(&self) -> bool {
self.enabled
}
pub fn optimize_query(
&mut self,
query: &Query,
logical_plan: &LogicalPlan,
physical_plan: &PhysicalNode,
) -> Result<PhysicalNode, String> {
if !self.enabled {
return Ok(physical_plan.clone());
}
match self
.pipeline
.optimize_query(query, logical_plan, physical_plan)
{
Ok(result) => Ok(result.physical_plan),
Err(e) => {
log::debug!("Pattern optimization error: {}", e);
Ok(physical_plan.clone())
}
}
}
pub fn get_metrics(&self) -> &OptimizationMetrics {
self.pipeline.get_metrics()
}
pub fn print_statistics(&self) {
let metrics = self.get_metrics();
log::debug!("\n🔍 Pattern Optimization Statistics:");
log::debug!(" Queries Processed: {}", metrics.queries_processed);
log::debug!(" Queries Optimized: {}", metrics.queries_optimized);
log::debug!(" Success Rate: {:.1}%", metrics.success_rate * 100.0);
log::debug!(
" Avg Optimization Time: {:.2}ms",
metrics.avg_optimization_time_ms
);
log::debug!(
" Avg Cost Improvement: {:.1}%",
metrics.avg_cost_improvement()
);
log::debug!(
" Avg Cardinality Reduction: {:.1}%",
metrics.avg_cardinality_reduction()
);
if metrics.queries_optimized > 0 {
log::debug!(
" 🎉 Pattern optimization is working! Comma-separated pattern bug is fixed."
);
}
}
}
#[allow(dead_code)]
pub mod integration_helpers {
use super::*;
use once_cell::sync::Lazy;
use std::sync::Mutex;
#[allow(dead_code)]
static GLOBAL_OPTIMIZER: Lazy<Mutex<GlobalPatternOptimizer>> =
Lazy::new(|| Mutex::new(GlobalPatternOptimizer::new()));
#[allow(dead_code)]
pub fn enable_pattern_optimization() {
if let Ok(mut optimizer) = GLOBAL_OPTIMIZER.lock() {
optimizer.set_enabled(true);
log::debug!(
"✅ Pattern optimization enabled - comma-separated pattern bug fix is active"
);
}
}
pub fn disable_pattern_optimization() {
if let Ok(mut optimizer) = GLOBAL_OPTIMIZER.lock() {
optimizer.set_enabled(false);
log::debug!("❌ Pattern optimization disabled");
}
}
pub fn is_pattern_optimization_enabled() -> bool {
GLOBAL_OPTIMIZER
.lock()
.map(|opt| opt.is_enabled())
.unwrap_or(false)
}
pub fn optimize_query_global(
query: &Query,
logical_plan: &LogicalPlan,
physical_plan: &PhysicalNode,
) -> Result<PhysicalNode, String> {
match GLOBAL_OPTIMIZER.lock() {
Ok(mut optimizer) => optimizer.optimize_query(query, logical_plan, physical_plan),
Err(_) => Ok(physical_plan.clone()), }
}
pub fn print_global_statistics() {
if let Ok(optimizer) = GLOBAL_OPTIMIZER.lock() {
optimizer.print_statistics();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_pipeline_creation() {
let pipeline = PatternOptimizationPipeline::new();
assert!(pipeline.config.enable_optimization);
assert_eq!(pipeline.metrics.queries_processed, 0);
}
#[test]
fn test_pipeline_with_config() {
let config = IntegrationConfig {
enable_optimization: false,
enable_logging: true,
fallback_on_error: false,
min_improvement_threshold: 20.0,
};
let pipeline = PatternOptimizationPipeline::with_config(config.clone());
assert!(!pipeline.config.enable_optimization);
assert!(pipeline.config.enable_logging);
assert!(!pipeline.config.fallback_on_error);
assert_eq!(pipeline.config.min_improvement_threshold, 20.0);
}
#[test]
fn test_optimization_metrics() {
let mut metrics = OptimizationMetrics::default();
let improvement = OptimizationImprovement {
cost_reduction_percentage: 85.0,
cardinality_reduction_percentage: 83.3,
original_cost: 1000.0,
optimized_cost: 150.0,
original_rows: 18,
optimized_rows: 3,
};
metrics.record_success(50, &improvement);
assert_eq!(metrics.queries_processed, 1);
assert_eq!(metrics.queries_optimized, 1);
assert_eq!(metrics.success_rate, 1.0);
assert_eq!(metrics.avg_optimization_time_ms, 50.0);
assert_eq!(metrics.avg_cost_improvement(), 85.0);
assert_eq!(metrics.avg_cardinality_reduction(), 83.3);
}
#[test]
fn test_global_optimizer() {
let mut optimizer = GlobalPatternOptimizer::new();
assert!(optimizer.is_enabled());
optimizer.set_enabled(false);
assert!(!optimizer.is_enabled());
optimizer.set_enabled(true);
assert!(optimizer.is_enabled());
}
#[test]
fn test_integration_helpers() {
integration_helpers::enable_pattern_optimization();
assert!(integration_helpers::is_pattern_optimization_enabled());
integration_helpers::disable_pattern_optimization();
assert!(!integration_helpers::is_pattern_optimization_enabled());
integration_helpers::enable_pattern_optimization();
}
#[test]
fn test_optimization_pipeline_result() {
let physical_plan = PhysicalNode::NodeSeqScan {
variable: "n".to_string(),
labels: vec!["Person".to_string()],
properties: None,
estimated_rows: 100,
estimated_cost: 50.0,
};
let improvement = OptimizationImprovement {
cost_reduction_percentage: 90.0,
cardinality_reduction_percentage: 83.3,
original_cost: 500.0,
optimized_cost: 50.0,
original_rows: 18,
optimized_rows: 3,
};
let result = OptimizationPipelineResult::optimized(
physical_plan.clone(),
improvement.clone(),
"Path traversal optimization applied".to_string(),
);
assert!(result.optimized);
assert_eq!(result.improvement.cost_reduction_percentage, 90.0);
assert!(result.explanation.contains("Path traversal"));
let no_opt_result = OptimizationPipelineResult::no_optimization(
physical_plan,
"No optimization needed".to_string(),
);
assert!(!no_opt_result.optimized);
assert_eq!(no_opt_result.improvement.cost_reduction_percentage, 0.0);
}
}