Skip to main content

oxirs_core/query/
advanced_statistics.rs

1//! Advanced Query Statistics and Pattern Analysis
2//!
3//! This module provides sophisticated statistical analysis for query optimization,
4//! including histogram-based cardinality estimation, correlation detection,
5//! and adaptive learning from query execution history.
6
7use crate::query::algebra::{AlgebraTriplePattern, GraphPattern, TermPattern};
8use std::collections::HashMap;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::{Arc, RwLock};
11
12/// Advanced statistics collector with histogram support
13#[derive(Debug)]
14pub struct AdvancedStatisticsCollector {
15    /// Subject cardinality histogram
16    subject_histogram: Arc<RwLock<CardinalityHistogram>>,
17    /// Predicate cardinality histogram
18    predicate_histogram: Arc<RwLock<CardinalityHistogram>>,
19    /// Object cardinality histogram
20    object_histogram: Arc<RwLock<CardinalityHistogram>>,
21    /// Join selectivity estimates
22    join_selectivity: Arc<RwLock<JoinSelectivityEstimator>>,
23    /// Pattern execution history
24    execution_history: Arc<RwLock<ExecutionHistory>>,
25    /// Total queries analyzed
26    queries_analyzed: AtomicU64,
27}
28
29impl AdvancedStatisticsCollector {
30    /// Create a new advanced statistics collector
31    pub fn new() -> Self {
32        Self {
33            subject_histogram: Arc::new(RwLock::new(CardinalityHistogram::new())),
34            predicate_histogram: Arc::new(RwLock::new(CardinalityHistogram::new())),
35            object_histogram: Arc::new(RwLock::new(CardinalityHistogram::new())),
36            join_selectivity: Arc::new(RwLock::new(JoinSelectivityEstimator::new())),
37            execution_history: Arc::new(RwLock::new(ExecutionHistory::new(1000))),
38            queries_analyzed: AtomicU64::new(0),
39        }
40    }
41
42    /// Record execution of a triple pattern
43    pub fn record_pattern_execution(
44        &self,
45        pattern: &AlgebraTriplePattern,
46        actual_cardinality: usize,
47        execution_time_ms: u64,
48    ) {
49        // Update histograms based on pattern terms
50        self.update_histograms(pattern, actual_cardinality);
51
52        // Record in execution history
53        let mut history = self
54            .execution_history
55            .write()
56            .expect("execution_history lock poisoned");
57        history.record(PatternExecution {
58            pattern: pattern.clone(),
59            cardinality: actual_cardinality,
60            execution_time_ms,
61            timestamp: std::time::SystemTime::now(),
62        });
63
64        self.queries_analyzed.fetch_add(1, Ordering::Relaxed);
65    }
66
67    /// Update cardinality histograms
68    fn update_histograms(&self, pattern: &AlgebraTriplePattern, cardinality: usize) {
69        // Update subject histogram if bound
70        if let TermPattern::NamedNode(node) = &pattern.subject {
71            let mut hist = self
72                .subject_histogram
73                .write()
74                .expect("subject_histogram lock poisoned");
75            hist.record(node.as_str(), cardinality);
76        }
77
78        // Update predicate histogram if bound
79        if let TermPattern::NamedNode(node) = &pattern.predicate {
80            let mut hist = self
81                .predicate_histogram
82                .write()
83                .expect("predicate_histogram lock poisoned");
84            hist.record(node.as_str(), cardinality);
85        }
86
87        // Update object histogram if bound
88        if let TermPattern::NamedNode(node) = &pattern.object {
89            let mut hist = self
90                .object_histogram
91                .write()
92                .expect("object_histogram lock poisoned");
93            hist.record(node.as_str(), cardinality);
94        }
95    }
96
97    /// Estimate cardinality for a pattern using histograms
98    pub fn estimate_cardinality(&self, pattern: &AlgebraTriplePattern) -> Option<usize> {
99        // Use histogram data if available
100        let subject_est = if let TermPattern::NamedNode(node) = &pattern.subject {
101            self.subject_histogram
102                .read()
103                .expect("subject_histogram lock poisoned")
104                .estimate(node.as_str())
105        } else {
106            None
107        };
108
109        let predicate_est = if let TermPattern::NamedNode(node) = &pattern.predicate {
110            self.predicate_histogram
111                .read()
112                .expect("predicate_histogram lock poisoned")
113                .estimate(node.as_str())
114        } else {
115            None
116        };
117
118        let object_est = if let TermPattern::NamedNode(node) = &pattern.object {
119            self.object_histogram
120                .read()
121                .expect("object_histogram lock poisoned")
122                .estimate(node.as_str())
123        } else {
124            None
125        };
126
127        // Combine estimates using minimum (most selective)
128        [subject_est, predicate_est, object_est]
129            .iter()
130            .filter_map(|&x| x)
131            .min()
132    }
133
134    /// Record join execution for selectivity learning
135    pub fn record_join_execution(
136        &self,
137        _left_pattern: &GraphPattern,
138        _right_pattern: &GraphPattern,
139        left_cardinality: usize,
140        right_cardinality: usize,
141        result_cardinality: usize,
142    ) {
143        let mut estimator = self
144            .join_selectivity
145            .write()
146            .expect("join_selectivity lock poisoned");
147        estimator.record_join(left_cardinality, right_cardinality, result_cardinality);
148    }
149
150    /// Estimate join selectivity
151    pub fn estimate_join_selectivity(&self, left_card: usize, right_card: usize) -> f64 {
152        self.join_selectivity
153            .read()
154            .expect("join_selectivity lock poisoned")
155            .estimate(left_card, right_card)
156    }
157
158    /// Get execution history for a pattern
159    pub fn get_pattern_history(&self, pattern: &AlgebraTriplePattern) -> Vec<PatternExecution> {
160        self.execution_history
161            .read()
162            .expect("execution_history lock poisoned")
163            .get_similar_patterns(pattern)
164    }
165
166    /// Get overall statistics
167    pub fn get_statistics(&self) -> AdvancedStatistics {
168        AdvancedStatistics {
169            queries_analyzed: self.queries_analyzed.load(Ordering::Relaxed),
170            subject_histogram_size: self
171                .subject_histogram
172                .read()
173                .expect("subject_histogram lock poisoned")
174                .size(),
175            predicate_histogram_size: self
176                .predicate_histogram
177                .read()
178                .expect("predicate_histogram lock poisoned")
179                .size(),
180            object_histogram_size: self
181                .object_histogram
182                .read()
183                .expect("object_histogram lock poisoned")
184                .size(),
185            join_samples: self
186                .join_selectivity
187                .read()
188                .expect("join_selectivity lock poisoned")
189                .sample_count(),
190            history_size: self
191                .execution_history
192                .read()
193                .expect("execution_history lock poisoned")
194                .size(),
195        }
196    }
197
198    /// Clear all statistics (useful for testing)
199    pub fn clear(&self) {
200        self.subject_histogram
201            .write()
202            .expect("subject_histogram lock poisoned")
203            .clear();
204        self.predicate_histogram
205            .write()
206            .expect("predicate_histogram lock poisoned")
207            .clear();
208        self.object_histogram
209            .write()
210            .expect("object_histogram lock poisoned")
211            .clear();
212        self.join_selectivity
213            .write()
214            .expect("join_selectivity lock poisoned")
215            .clear();
216        self.execution_history
217            .write()
218            .expect("execution_history lock poisoned")
219            .clear();
220        self.queries_analyzed.store(0, Ordering::Relaxed);
221    }
222}
223
224impl Default for AdvancedStatisticsCollector {
225    fn default() -> Self {
226        Self::new()
227    }
228}
229
230/// Cardinality histogram for specific terms
231#[derive(Debug)]
232struct CardinalityHistogram {
233    /// Term -> observed cardinalities
234    data: HashMap<String, Vec<usize>>,
235    /// Maximum samples per term
236    max_samples: usize,
237}
238
239impl CardinalityHistogram {
240    fn new() -> Self {
241        Self {
242            data: HashMap::new(),
243            max_samples: 100, // Keep last 100 observations per term
244        }
245    }
246
247    fn record(&mut self, term: &str, cardinality: usize) {
248        let samples = self.data.entry(term.to_string()).or_default();
249        samples.push(cardinality);
250
251        // Keep only recent samples
252        if samples.len() > self.max_samples {
253            samples.remove(0);
254        }
255    }
256
257    fn estimate(&self, term: &str) -> Option<usize> {
258        self.data.get(term).and_then(|samples| {
259            if samples.is_empty() {
260                None
261            } else {
262                // Use median for robust estimation
263                let mut sorted = samples.clone();
264                sorted.sort_unstable();
265                Some(sorted[sorted.len() / 2])
266            }
267        })
268    }
269
270    fn size(&self) -> usize {
271        self.data.len()
272    }
273
274    fn clear(&mut self) {
275        self.data.clear();
276    }
277}
278
279/// Join selectivity estimator
280#[derive(Debug)]
281struct JoinSelectivityEstimator {
282    /// Observed join results: (left_card, right_card) -> result_card
283    observations: Vec<JoinObservation>,
284    /// Maximum observations to keep
285    max_observations: usize,
286}
287
288#[derive(Debug, Clone)]
289#[allow(dead_code)]
290struct JoinObservation {
291    left_cardinality: usize,
292    right_cardinality: usize,
293    result_cardinality: usize,
294    selectivity: f64,
295}
296
297impl JoinSelectivityEstimator {
298    fn new() -> Self {
299        Self {
300            observations: Vec::new(),
301            max_observations: 1000,
302        }
303    }
304
305    fn record_join(&mut self, left_card: usize, right_card: usize, result_card: usize) {
306        let product = (left_card as f64) * (right_card as f64);
307        let selectivity = if product > 0.0 {
308            (result_card as f64) / product
309        } else {
310            0.0
311        };
312
313        self.observations.push(JoinObservation {
314            left_cardinality: left_card,
315            right_cardinality: right_card,
316            result_cardinality: result_card,
317            selectivity,
318        });
319
320        // Keep only recent observations
321        if self.observations.len() > self.max_observations {
322            self.observations.remove(0);
323        }
324    }
325
326    fn estimate(&self, left_card: usize, right_card: usize) -> f64 {
327        if self.observations.is_empty() {
328            return 0.1; // Default selectivity
329        }
330
331        // Find similar observations (within 2x range)
332        let similar: Vec<f64> = self
333            .observations
334            .iter()
335            .filter(|obs| {
336                let left_ratio = (obs.left_cardinality as f64) / (left_card.max(1) as f64);
337                let right_ratio = (obs.right_cardinality as f64) / (right_card.max(1) as f64);
338                (0.5..=2.0).contains(&left_ratio) && (0.5..=2.0).contains(&right_ratio)
339            })
340            .map(|obs| obs.selectivity)
341            .collect();
342
343        if similar.is_empty() {
344            // Use global average
345            let avg: f64 = self.observations.iter().map(|o| o.selectivity).sum::<f64>()
346                / self.observations.len() as f64;
347            avg
348        } else {
349            // Use median of similar observations
350            let mut sorted = similar;
351            sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
352            sorted[sorted.len() / 2]
353        }
354    }
355
356    fn sample_count(&self) -> usize {
357        self.observations.len()
358    }
359
360    fn clear(&mut self) {
361        self.observations.clear();
362    }
363}
364
365/// Execution history tracker
366#[derive(Debug)]
367struct ExecutionHistory {
368    /// Recent pattern executions
369    executions: Vec<PatternExecution>,
370    /// Maximum history size
371    max_size: usize,
372}
373
374#[derive(Debug, Clone)]
375pub struct PatternExecution {
376    pub pattern: AlgebraTriplePattern,
377    pub cardinality: usize,
378    pub execution_time_ms: u64,
379    pub timestamp: std::time::SystemTime,
380}
381
382impl ExecutionHistory {
383    fn new(max_size: usize) -> Self {
384        Self {
385            executions: Vec::new(),
386            max_size,
387        }
388    }
389
390    fn record(&mut self, execution: PatternExecution) {
391        self.executions.push(execution);
392
393        // Keep only recent history
394        if self.executions.len() > self.max_size {
395            self.executions.remove(0);
396        }
397    }
398
399    fn get_similar_patterns(&self, pattern: &AlgebraTriplePattern) -> Vec<PatternExecution> {
400        self.executions
401            .iter()
402            .filter(|exec| Self::patterns_similar(&exec.pattern, pattern))
403            .cloned()
404            .collect()
405    }
406
407    fn patterns_similar(p1: &AlgebraTriplePattern, p2: &AlgebraTriplePattern) -> bool {
408        // Patterns are similar if they have the same structure (bound/unbound positions)
409        Self::term_pattern_type(&p1.subject) == Self::term_pattern_type(&p2.subject)
410            && Self::term_pattern_type(&p1.predicate) == Self::term_pattern_type(&p2.predicate)
411            && Self::term_pattern_type(&p1.object) == Self::term_pattern_type(&p2.object)
412    }
413
414    fn term_pattern_type(term: &TermPattern) -> &'static str {
415        match term {
416            TermPattern::Variable(_) => "var",
417            TermPattern::NamedNode(_) => "node",
418            TermPattern::BlankNode(_) => "blank",
419            TermPattern::Literal(_) => "literal",
420            TermPattern::QuotedTriple(_) => "quoted",
421        }
422    }
423
424    fn size(&self) -> usize {
425        self.executions.len()
426    }
427
428    fn clear(&mut self) {
429        self.executions.clear();
430    }
431}
432
433/// Summary statistics
434#[derive(Debug, Clone)]
435pub struct AdvancedStatistics {
436    pub queries_analyzed: u64,
437    pub subject_histogram_size: usize,
438    pub predicate_histogram_size: usize,
439    pub object_histogram_size: usize,
440    pub join_samples: usize,
441    pub history_size: usize,
442}
443
444#[cfg(test)]
445mod tests {
446    use super::*;
447    use crate::model::{NamedNode, Variable};
448
449    fn create_test_pattern() -> AlgebraTriplePattern {
450        AlgebraTriplePattern {
451            subject: TermPattern::Variable(Variable::new("s").expect("valid variable name")),
452            predicate: TermPattern::NamedNode(
453                NamedNode::new("http://xmlns.com/foaf/0.1/name").expect("valid IRI"),
454            ),
455            object: TermPattern::Variable(Variable::new("o").expect("valid variable name")),
456        }
457    }
458
459    #[test]
460    fn test_collector_creation() {
461        let collector = AdvancedStatisticsCollector::new();
462        let stats = collector.get_statistics();
463
464        assert_eq!(stats.queries_analyzed, 0);
465        assert_eq!(stats.history_size, 0);
466    }
467
468    #[test]
469    fn test_pattern_recording() {
470        let collector = AdvancedStatisticsCollector::new();
471        let pattern = create_test_pattern();
472
473        collector.record_pattern_execution(&pattern, 100, 50);
474
475        let stats = collector.get_statistics();
476        assert_eq!(stats.queries_analyzed, 1);
477        assert_eq!(stats.history_size, 1);
478    }
479
480    #[test]
481    fn test_histogram_estimation() {
482        let collector = AdvancedStatisticsCollector::new();
483        let foaf_name = NamedNode::new("http://xmlns.com/foaf/0.1/name").expect("valid IRI");
484
485        let pattern = AlgebraTriplePattern {
486            subject: TermPattern::Variable(Variable::new("s").expect("valid variable name")),
487            predicate: TermPattern::NamedNode(foaf_name.clone()),
488            object: TermPattern::Variable(Variable::new("o").expect("valid variable name")),
489        };
490
491        // Record multiple observations
492        for i in 1..=10 {
493            collector.record_pattern_execution(&pattern, 100 * i, 10);
494        }
495
496        // Estimate should be based on histogram
497        let estimate = collector.estimate_cardinality(&pattern);
498        assert!(estimate.is_some());
499        let est = estimate.expect("estimate should be available");
500        // Should be around median (500-600)
501        assert!((400..=700).contains(&est));
502    }
503
504    #[test]
505    fn test_join_selectivity() {
506        let collector = AdvancedStatisticsCollector::new();
507
508        // Record several joins
509        collector.record_join_execution(
510            &GraphPattern::Bgp(vec![]),
511            &GraphPattern::Bgp(vec![]),
512            1000,
513            1000,
514            100,
515        );
516        collector.record_join_execution(
517            &GraphPattern::Bgp(vec![]),
518            &GraphPattern::Bgp(vec![]),
519            2000,
520            2000,
521            400,
522        );
523
524        // Estimate should be around 0.0001 (100/1M or 400/4M)
525        let selectivity = collector.estimate_join_selectivity(1500, 1500);
526        assert!(selectivity > 0.00005 && selectivity < 0.002);
527    }
528
529    #[test]
530    fn test_history_limit() {
531        let collector = AdvancedStatisticsCollector::new();
532        let pattern = create_test_pattern();
533
534        // Record more than max_size executions
535        for _ in 0..1500 {
536            collector.record_pattern_execution(&pattern, 100, 10);
537        }
538
539        let stats = collector.get_statistics();
540        assert!(stats.history_size <= 1000);
541    }
542
543    #[test]
544    fn test_clear_statistics() {
545        let collector = AdvancedStatisticsCollector::new();
546        let pattern = create_test_pattern();
547
548        collector.record_pattern_execution(&pattern, 100, 10);
549        collector.clear();
550
551        let stats = collector.get_statistics();
552        assert_eq!(stats.queries_analyzed, 0);
553        assert_eq!(stats.history_size, 0);
554    }
555}