Skip to main content

oxirs_core/query/
statistics.rs

1//! Query statistics collection and management using SciRS2-core
2//!
3//! This module provides comprehensive statistics collection for query optimization,
4//! including cardinality estimation, selectivity tracking, and execution metrics.
5
6use crate::model::pattern::TriplePattern;
7use crate::model::Triple;
8use crate::query::algebra::{AlgebraTriplePattern, TermPattern};
9use crate::OxirsError;
10use scirs2_core::metrics::{Counter, Histogram, MetricsRegistry};
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::sync::{Arc, RwLock};
15use std::time::{Duration, Instant};
16
17/// Graph statistics for query optimization
18///
19/// Collects and maintains statistical information about the RDF graph
20/// for use in cost-based query optimization.
21#[derive(Clone)]
22pub struct GraphStatistics {
23    /// Total number of triples in the graph
24    total_triples: Arc<AtomicU64>,
25    /// Total number of distinct subjects
26    distinct_subjects: Arc<AtomicU64>,
27    /// Total number of distinct predicates
28    distinct_predicates: Arc<AtomicU64>,
29    /// Total number of distinct objects
30    distinct_objects: Arc<AtomicU64>,
31    /// Statistics per predicate
32    predicate_stats: Arc<RwLock<HashMap<String, PredicateStatistics>>>,
33    /// Pattern selectivity history
34    pattern_selectivity: Arc<RwLock<HashMap<String, SelectivityInfo>>>,
35    /// Metrics registry (for future use)
36    #[allow(dead_code)]
37    metrics: Arc<MetricsRegistry>,
38    /// Statistics collection timestamp
39    last_updated: Arc<RwLock<Instant>>,
40}
41
42/// Statistics for a specific predicate
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct PredicateStatistics {
45    /// Number of triples with this predicate
46    pub count: u64,
47    /// Number of distinct subjects
48    pub distinct_subjects: u64,
49    /// Number of distinct objects
50    pub distinct_objects: u64,
51    /// Average objects per subject
52    pub avg_objects_per_subject: f64,
53    /// Average subjects per object (inverse property)
54    pub avg_subjects_per_object: f64,
55    /// Minimum cardinality observed
56    pub min_cardinality: u64,
57    /// Maximum cardinality observed
58    pub max_cardinality: u64,
59}
60
61/// Selectivity information for pattern matching
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct SelectivityInfo {
64    /// Pattern signature (hash of pattern structure)
65    pub pattern_signature: String,
66    /// Observed selectivity (0.0 to 1.0)
67    pub observed_selectivity: f64,
68    /// Number of observations
69    pub observation_count: u64,
70    /// Last observed timestamp (as milliseconds)
71    pub last_observed_ms: u128,
72    /// Estimated result size
73    pub estimated_result_size: u64,
74}
75
76/// Query execution statistics for feedback loop
77#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct QueryExecutionStats {
79    /// Query signature
80    pub query_signature: String,
81    /// Actual execution time
82    pub execution_time: Duration,
83    /// Estimated execution time
84    pub estimated_time: Duration,
85    /// Actual result count
86    pub actual_results: u64,
87    /// Estimated result count
88    pub estimated_results: u64,
89    /// Memory used in bytes
90    pub memory_bytes: u64,
91    /// CPU time used
92    pub cpu_time: Duration,
93}
94
95impl GraphStatistics {
96    /// Create a new graph statistics collector
97    pub fn new() -> Self {
98        let metrics = MetricsRegistry::new();
99
100        Self {
101            total_triples: Arc::new(AtomicU64::new(0)),
102            distinct_subjects: Arc::new(AtomicU64::new(0)),
103            distinct_predicates: Arc::new(AtomicU64::new(0)),
104            distinct_objects: Arc::new(AtomicU64::new(0)),
105            predicate_stats: Arc::new(RwLock::new(HashMap::new())),
106            pattern_selectivity: Arc::new(RwLock::new(HashMap::new())),
107            metrics: Arc::new(metrics),
108            last_updated: Arc::new(RwLock::new(Instant::now())),
109        }
110    }
111
112    /// Update statistics after inserting a triple
113    pub fn record_insert(&self, triple: &Triple) -> Result<(), OxirsError> {
114        self.total_triples.fetch_add(1, Ordering::Relaxed);
115
116        // Update predicate-specific statistics
117        if let crate::model::Predicate::NamedNode(predicate) = triple.predicate() {
118            let pred_str = predicate.as_str().to_string();
119
120            let mut stats = self.predicate_stats.write().map_err(|e| {
121                OxirsError::Store(format!("Failed to write predicate stats: {}", e))
122            })?;
123
124            let pred_stat = stats
125                .entry(pred_str.clone())
126                .or_insert_with(|| PredicateStatistics {
127                    count: 0,
128                    distinct_subjects: 0,
129                    distinct_objects: 0,
130                    avg_objects_per_subject: 0.0,
131                    avg_subjects_per_object: 0.0,
132                    min_cardinality: u64::MAX,
133                    max_cardinality: 0,
134                });
135
136            pred_stat.count += 1;
137
138            // Update metrics
139            let counter = Counter::new("graph.triples.total".to_string());
140            counter.add(1);
141
142            let pred_counter = Counter::new(format!("graph.predicate.{}.count", pred_str));
143            pred_counter.add(1);
144        }
145
146        // Update last updated timestamp
147        if let Ok(mut last) = self.last_updated.write() {
148            *last = Instant::now();
149        }
150
151        Ok(())
152    }
153
154    /// Update statistics after removing a triple
155    pub fn record_remove(&self, triple: &Triple) -> Result<(), OxirsError> {
156        let current = self.total_triples.load(Ordering::Relaxed);
157        if current > 0 {
158            self.total_triples.fetch_sub(1, Ordering::Relaxed);
159        }
160
161        // Update predicate-specific statistics
162        if let crate::model::Predicate::NamedNode(predicate) = triple.predicate() {
163            let pred_str = predicate.as_str().to_string();
164
165            let mut stats = self.predicate_stats.write().map_err(|e| {
166                OxirsError::Store(format!("Failed to write predicate stats: {}", e))
167            })?;
168
169            if let Some(pred_stat) = stats.get_mut(&pred_str) {
170                if pred_stat.count > 0 {
171                    pred_stat.count -= 1;
172                }
173            }
174
175            // Update metrics
176            let counter = Counter::new("graph.triples.removed".to_string());
177            counter.add(1);
178        }
179
180        // Update last updated timestamp
181        if let Ok(mut last) = self.last_updated.write() {
182            *last = Instant::now();
183        }
184
185        Ok(())
186    }
187
188    /// Get total number of triples
189    pub fn total_triples(&self) -> u64 {
190        self.total_triples.load(Ordering::Relaxed)
191    }
192
193    /// Get statistics for a specific predicate
194    pub fn get_predicate_stats(&self, predicate: &str) -> Option<PredicateStatistics> {
195        self.predicate_stats.read().ok()?.get(predicate).cloned()
196    }
197
198    /// Estimate cardinality for a triple pattern
199    pub fn estimate_pattern_cardinality(&self, pattern: &TriplePattern) -> u64 {
200        let total = self.total_triples() as f64;
201        if total == 0.0 {
202            return 0;
203        }
204
205        let mut selectivity = 1.0;
206
207        // Adjust selectivity based on bound terms
208        if pattern.subject().is_some() {
209            selectivity *= 0.001; // Bound subject is very selective
210        } else {
211            selectivity *= 0.5;
212        }
213
214        if let Some(crate::model::pattern::PredicatePattern::NamedNode(pred)) = pattern.predicate()
215        {
216            // Use actual predicate statistics if available
217            if let Some(stats) = self.get_predicate_stats(pred.as_str()) {
218                let pred_selectivity = stats.count as f64 / total;
219                selectivity *= pred_selectivity;
220            } else {
221                selectivity *= 0.1; // Default predicate selectivity
222            }
223        } else {
224            selectivity *= 0.5;
225        }
226
227        if pattern.object().is_some() {
228            selectivity *= 0.001; // Bound object is very selective
229        } else {
230            selectivity *= 0.5;
231        }
232
233        (total * selectivity).max(1.0) as u64
234    }
235
236    /// Estimate cardinality for algebra triple pattern
237    pub fn estimate_algebra_pattern_cardinality(&self, pattern: &AlgebraTriplePattern) -> u64 {
238        let total = self.total_triples() as f64;
239        if total == 0.0 {
240            return 0;
241        }
242
243        let mut selectivity = 1.0;
244
245        // Adjust based on bound terms
246        match &pattern.subject {
247            TermPattern::Variable(_) => selectivity *= 0.5,
248            _ => selectivity *= 0.001,
249        }
250
251        match &pattern.predicate {
252            TermPattern::Variable(_) => selectivity *= 0.5,
253            TermPattern::NamedNode(pred) => {
254                if let Some(stats) = self.get_predicate_stats(pred.as_str()) {
255                    selectivity *= stats.count as f64 / total;
256                } else {
257                    selectivity *= 0.1;
258                }
259            }
260            _ => selectivity *= 0.1,
261        }
262
263        match &pattern.object {
264            TermPattern::Variable(_) => selectivity *= 0.5,
265            _ => selectivity *= 0.001,
266        }
267
268        (total * selectivity).max(1.0) as u64
269    }
270
271    /// Record actual query execution for adaptive learning
272    pub fn record_query_execution(&self, stats: QueryExecutionStats) -> Result<(), OxirsError> {
273        // Record metrics using counters
274        let exec_counter = Counter::new("query.execution.total".to_string());
275        exec_counter.add(1);
276
277        let time_counter = Counter::new("query.execution.time_ms".to_string());
278        time_counter.add(stats.execution_time.as_millis() as u64);
279
280        let accuracy_ratio = if stats.estimated_results > 0 {
281            stats.actual_results as f64 / stats.estimated_results as f64
282        } else {
283            1.0
284        };
285
286        let histogram = Histogram::new("query.estimation.accuracy".to_string());
287        histogram.observe(accuracy_ratio);
288
289        // Update selectivity information based on actual results
290        let observed_selectivity = if self.total_triples() > 0 {
291            stats.actual_results as f64 / self.total_triples() as f64
292        } else {
293            0.0
294        };
295
296        let mut pattern_sel = self
297            .pattern_selectivity
298            .write()
299            .map_err(|e| OxirsError::Query(format!("Failed to write selectivity: {}", e)))?;
300
301        let selectivity_info = pattern_sel
302            .entry(stats.query_signature.clone())
303            .or_insert_with(|| SelectivityInfo {
304                pattern_signature: stats.query_signature.clone(),
305                observed_selectivity: 0.0,
306                observation_count: 0,
307                last_observed_ms: 0,
308                estimated_result_size: 0,
309            });
310
311        // Update with exponential moving average
312        let alpha = 0.3; // Weight for new observation
313        selectivity_info.observed_selectivity =
314            alpha * observed_selectivity + (1.0 - alpha) * selectivity_info.observed_selectivity;
315        selectivity_info.observation_count += 1;
316        selectivity_info.last_observed_ms = Instant::now().elapsed().as_millis();
317        selectivity_info.estimated_result_size = stats.actual_results;
318
319        Ok(())
320    }
321
322    /// Get learned selectivity for a pattern
323    pub fn get_learned_selectivity(&self, pattern_signature: &str) -> Option<f64> {
324        self.pattern_selectivity
325            .read()
326            .ok()?
327            .get(pattern_signature)
328            .map(|info| info.observed_selectivity)
329    }
330
331    /// Export statistics to JSON for persistence
332    pub fn export_to_json(&self) -> Result<String, OxirsError> {
333        let stats = self
334            .predicate_stats
335            .read()
336            .map_err(|e| OxirsError::Serialize(format!("Failed to read stats: {}", e)))?;
337
338        serde_json::to_string_pretty(&*stats).map_err(|e| OxirsError::Serialize(e.to_string()))
339    }
340
341    /// Import statistics from JSON
342    pub fn import_from_json(&self, json: &str) -> Result<(), OxirsError> {
343        let stats: HashMap<String, PredicateStatistics> =
344            serde_json::from_str(json).map_err(|e| OxirsError::Parse(e.to_string()))?;
345
346        let mut current_stats = self
347            .predicate_stats
348            .write()
349            .map_err(|e| OxirsError::Store(format!("Failed to write stats: {}", e)))?;
350
351        *current_stats = stats;
352
353        // Recalculate total triples
354        let total: u64 = current_stats.values().map(|s| s.count).sum();
355        self.total_triples.store(total, Ordering::Relaxed);
356
357        Ok(())
358    }
359
360    /// Full statistics recomputation (expensive operation)
361    pub fn recompute_from_triples(&self, triples: &[Triple]) -> Result<(), OxirsError> {
362        tracing::info!("Recomputing statistics from {} triples", triples.len());
363
364        let start = Instant::now();
365
366        // Reset counters
367        self.total_triples
368            .store(triples.len() as u64, Ordering::Relaxed);
369
370        let mut predicate_counts: HashMap<String, PredicateStatistics> = HashMap::new();
371        let mut subject_counts: HashMap<String, u64> = HashMap::new();
372        let mut object_counts: HashMap<String, u64> = HashMap::new();
373
374        // First pass: count triples per predicate
375        for triple in triples {
376            if let crate::model::Predicate::NamedNode(pred) = triple.predicate() {
377                let pred_str = pred.as_str().to_string();
378
379                let stat = predicate_counts.entry(pred_str.clone()).or_insert_with(|| {
380                    PredicateStatistics {
381                        count: 0,
382                        distinct_subjects: 0,
383                        distinct_objects: 0,
384                        avg_objects_per_subject: 0.0,
385                        avg_subjects_per_object: 0.0,
386                        min_cardinality: u64::MAX,
387                        max_cardinality: 0,
388                    }
389                });
390
391                stat.count += 1;
392
393                // Track subjects and objects for this predicate
394                if let crate::model::Subject::NamedNode(subj) = triple.subject() {
395                    *subject_counts
396                        .entry(format!("{}:{}", pred_str, subj.as_str()))
397                        .or_insert(0) += 1;
398                }
399
400                if let crate::model::Object::NamedNode(obj) = triple.object() {
401                    *object_counts
402                        .entry(format!("{}:{}", pred_str, obj.as_str()))
403                        .or_insert(0) += 1;
404                }
405            }
406        }
407
408        // Second pass: calculate distinct counts and averages
409        for (pred_str, stat) in predicate_counts.iter_mut() {
410            let prefix = format!("{}:", pred_str);
411
412            stat.distinct_subjects = subject_counts
413                .keys()
414                .filter(|k| k.starts_with(&prefix))
415                .count() as u64;
416
417            stat.distinct_objects = object_counts
418                .keys()
419                .filter(|k| k.starts_with(&prefix))
420                .count() as u64;
421
422            if stat.distinct_subjects > 0 {
423                stat.avg_objects_per_subject = stat.count as f64 / stat.distinct_subjects as f64;
424            }
425
426            if stat.distinct_objects > 0 {
427                stat.avg_subjects_per_object = stat.count as f64 / stat.distinct_objects as f64;
428            }
429        }
430
431        // Update stored statistics
432        let mut stats = self
433            .predicate_stats
434            .write()
435            .map_err(|e| OxirsError::Store(format!("Failed to write stats: {}", e)))?;
436        *stats = predicate_counts;
437
438        // Update distinct counts
439        self.distinct_predicates
440            .store(stats.len() as u64, Ordering::Relaxed);
441
442        let elapsed = start.elapsed();
443        tracing::info!("Statistics recomputation completed in {:?}", elapsed);
444
445        Ok(())
446    }
447
448    /// Get all statistics as a summary
449    pub fn summary(&self) -> StatisticsSummary {
450        StatisticsSummary {
451            total_triples: self.total_triples(),
452            distinct_subjects: self.distinct_subjects.load(Ordering::Relaxed),
453            distinct_predicates: self.distinct_predicates.load(Ordering::Relaxed),
454            distinct_objects: self.distinct_objects.load(Ordering::Relaxed),
455            predicate_count: self
456                .predicate_stats
457                .read()
458                .ok()
459                .map(|s| s.len())
460                .unwrap_or(0),
461            last_updated: self.last_updated.read().ok().map(|t| *t),
462        }
463    }
464}
465
466impl Default for GraphStatistics {
467    fn default() -> Self {
468        Self::new()
469    }
470}
471
472/// Summary of graph statistics
473#[derive(Debug, Clone, Serialize, Deserialize)]
474pub struct StatisticsSummary {
475    pub total_triples: u64,
476    pub distinct_subjects: u64,
477    pub distinct_predicates: u64,
478    pub distinct_objects: u64,
479    pub predicate_count: usize,
480    #[serde(skip)]
481    pub last_updated: Option<Instant>,
482}
483
484#[cfg(test)]
485mod tests {
486    use super::*;
487    use crate::model::{Literal, NamedNode};
488
489    #[test]
490    fn test_statistics_creation() {
491        let stats = GraphStatistics::new();
492        assert_eq!(stats.total_triples(), 0);
493    }
494
495    #[test]
496    fn test_record_insert() {
497        let stats = GraphStatistics::new();
498
499        let subject = NamedNode::new("http://example.org/s").expect("valid IRI");
500        let predicate = NamedNode::new("http://example.org/p").expect("valid IRI");
501        let object = Literal::new("value");
502
503        let triple = Triple::new(subject, predicate, object);
504
505        stats
506            .record_insert(&triple)
507            .expect("operation should succeed");
508        assert_eq!(stats.total_triples(), 1);
509    }
510
511    #[test]
512    fn test_record_remove() {
513        let stats = GraphStatistics::new();
514
515        let subject = NamedNode::new("http://example.org/s").expect("valid IRI");
516        let predicate = NamedNode::new("http://example.org/p").expect("valid IRI");
517        let object = Literal::new("value");
518
519        let triple = Triple::new(subject, predicate, object);
520
521        stats
522            .record_insert(&triple)
523            .expect("operation should succeed");
524        assert_eq!(stats.total_triples(), 1);
525
526        stats
527            .record_remove(&triple)
528            .expect("operation should succeed");
529        assert_eq!(stats.total_triples(), 0);
530    }
531
532    #[test]
533    fn test_predicate_statistics() {
534        let stats = GraphStatistics::new();
535
536        let subject = NamedNode::new("http://example.org/s").expect("valid IRI");
537        let predicate = NamedNode::new("http://example.org/p").expect("valid IRI");
538        let object = Literal::new("value");
539
540        let triple = Triple::new(subject, predicate.clone(), object);
541
542        stats
543            .record_insert(&triple)
544            .expect("operation should succeed");
545
546        let pred_stats = stats.get_predicate_stats(predicate.as_str());
547        assert!(pred_stats.is_some());
548        assert_eq!(pred_stats.expect("predicate stats should exist").count, 1);
549    }
550
551    #[test]
552    fn test_pattern_cardinality_estimation() {
553        let stats = GraphStatistics::new();
554
555        // Add some triples
556        for i in 0..100 {
557            let subject = NamedNode::new(format!("http://example.org/s{}", i))
558                .expect("valid IRI from format");
559            let predicate = NamedNode::new("http://example.org/p").expect("valid IRI");
560            let object = Literal::new(format!("value{}", i));
561
562            let triple = Triple::new(subject, predicate, object);
563            stats
564                .record_insert(&triple)
565                .expect("operation should succeed");
566        }
567
568        // Estimate cardinality for a pattern with bound predicate
569        let pattern = TriplePattern::new(
570            None,
571            Some(crate::model::pattern::PredicatePattern::NamedNode(
572                NamedNode::new("http://example.org/p").expect("valid IRI"),
573            )),
574            None,
575        );
576
577        let estimated = stats.estimate_pattern_cardinality(&pattern);
578        assert!(estimated > 0);
579        assert!(estimated <= 100);
580    }
581
582    #[test]
583    fn test_query_execution_recording() {
584        let stats = GraphStatistics::new();
585
586        let exec_stats = QueryExecutionStats {
587            query_signature: "SELECT ?s WHERE { ?s ?p ?o }".to_string(),
588            execution_time: Duration::from_millis(50),
589            estimated_time: Duration::from_millis(100),
590            actual_results: 42,
591            estimated_results: 50,
592            memory_bytes: 1024 * 1024,
593            cpu_time: Duration::from_millis(30),
594        };
595
596        stats
597            .record_query_execution(exec_stats)
598            .expect("operation should succeed");
599
600        let learned = stats.get_learned_selectivity("SELECT ?s WHERE { ?s ?p ?o }");
601        assert!(learned.is_some());
602    }
603
604    #[test]
605    fn test_statistics_export_import() {
606        let stats = GraphStatistics::new();
607
608        // Add some data
609        let subject = NamedNode::new("http://example.org/s").expect("valid IRI");
610        let predicate = NamedNode::new("http://example.org/p").expect("valid IRI");
611        let object = Literal::new("value");
612
613        let triple = Triple::new(subject, predicate, object);
614        stats
615            .record_insert(&triple)
616            .expect("operation should succeed");
617
618        // Export
619        let json = stats.export_to_json().expect("operation should succeed");
620        assert!(!json.is_empty());
621
622        // Import to new instance
623        let stats2 = GraphStatistics::new();
624        stats2
625            .import_from_json(&json)
626            .expect("operation should succeed");
627
628        assert_eq!(stats2.total_triples(), 1);
629    }
630
631    #[test]
632    fn test_statistics_summary() {
633        let stats = GraphStatistics::new();
634
635        let summary = stats.summary();
636        assert_eq!(summary.total_triples, 0);
637        assert_eq!(summary.predicate_count, 0);
638    }
639}