Skip to main content

oxirs_arq/stats/
runtime_stats.rs

1//! Runtime Statistics Collector for Query Feedback
2//!
3//! Collects real execution statistics per query and feeds them back to the
4//! optimizer for adaptive plan improvement over time.  The collector maintains
5//! a bounded circular history of `QueryExecutionStats` records, computes
6//! exponential-moving-average selectivity per pattern, and exposes analysis
7//! helpers such as `slowest_queries` and `pattern_hit_rate`.
8
9use std::collections::HashMap;
10
11// ---------------------------------------------------------------------------
12// Pattern-level stats
13// ---------------------------------------------------------------------------
14
15/// Execution statistics for a single triple-pattern within a query.
16#[derive(Debug, Clone, PartialEq)]
17pub struct PatternStats {
18    /// A human-readable identifier for the pattern (e.g. `"?s rdf:type ?t"`).
19    pub pattern: String,
20    /// The cardinality that the optimizer predicted before execution.
21    pub cardinality_estimate: usize,
22    /// The true cardinality observed during execution.
23    pub actual_cardinality: usize,
24    /// Wall-clock time spent evaluating this pattern (milliseconds).
25    pub evaluation_time_ms: u64,
26    /// Whether the result was served from a cache rather than evaluated.
27    pub cache_hit: bool,
28}
29
30impl PatternStats {
31    /// Construct a new `PatternStats` record.
32    pub fn new(
33        pattern: impl Into<String>,
34        cardinality_estimate: usize,
35        actual_cardinality: usize,
36        evaluation_time_ms: u64,
37        cache_hit: bool,
38    ) -> Self {
39        Self {
40            pattern: pattern.into(),
41            cardinality_estimate,
42            actual_cardinality,
43            evaluation_time_ms,
44            cache_hit,
45        }
46    }
47
48    /// Ratio of actual to estimated cardinality; values < 1 mean the
49    /// optimizer over-estimated, values > 1 mean it under-estimated.
50    pub fn cardinality_ratio(&self) -> f64 {
51        self.actual_cardinality as f64 / self.cardinality_estimate.max(1) as f64
52    }
53}
54
55// ---------------------------------------------------------------------------
56// Query-level stats
57// ---------------------------------------------------------------------------
58
59/// Aggregated execution statistics for a complete SPARQL query evaluation.
60#[derive(Debug, Clone, PartialEq)]
61pub struct QueryExecutionStats {
62    /// Unique identifier for the query execution (e.g. UUID or hash string).
63    pub query_id: String,
64    /// Total wall-clock time for the entire query (milliseconds).
65    pub total_time_ms: u64,
66    /// Per-pattern breakdown.
67    pub pattern_stats: Vec<PatternStats>,
68    /// Number of result rows produced.
69    pub result_count: usize,
70    /// Number of join operations performed.
71    pub join_count: usize,
72    /// Number of FILTER evaluations performed.
73    pub filter_count: usize,
74    /// Cache hits encountered during evaluation.
75    pub cache_hits: usize,
76    /// Cache misses encountered during evaluation.
77    pub cache_misses: usize,
78}
79
80impl QueryExecutionStats {
81    /// Construct a new `QueryExecutionStats` with the given fields.
82    #[allow(clippy::too_many_arguments)]
83    pub fn new(
84        query_id: impl Into<String>,
85        total_time_ms: u64,
86        pattern_stats: Vec<PatternStats>,
87        result_count: usize,
88        join_count: usize,
89        filter_count: usize,
90        cache_hits: usize,
91        cache_misses: usize,
92    ) -> Self {
93        Self {
94            query_id: query_id.into(),
95            total_time_ms,
96            pattern_stats,
97            result_count,
98            join_count,
99            filter_count,
100            cache_hits,
101            cache_misses,
102        }
103    }
104
105    /// Overall cache hit rate `[0.0, 1.0]` for this query execution.
106    pub fn cache_hit_rate(&self) -> f64 {
107        let total = self.cache_hits + self.cache_misses;
108        if total == 0 {
109            0.0
110        } else {
111            self.cache_hits as f64 / total as f64
112        }
113    }
114}
115
116// ---------------------------------------------------------------------------
117// Collector
118// ---------------------------------------------------------------------------
119
120/// Smoothing coefficient for exponential moving average selectivity updates.
121const EMA_ALPHA: f64 = 0.3;
122
123/// Collects runtime execution statistics and maintains adaptive selectivity
124/// estimates per observed triple-pattern.
125///
126/// The history is bounded: once it reaches `max_history` entries the oldest
127/// record is dropped (FIFO).
128pub struct RuntimeStatsCollector {
129    /// Circular history of query execution records (newest at the back).
130    history: Vec<QueryExecutionStats>,
131    /// Maximum number of history entries retained.
132    max_history: usize,
133    /// EMA-based selectivity per pattern key.
134    /// `selectivity(p) = actual_cardinality / max(estimated_cardinality, 1)`.
135    pattern_selectivity: HashMap<String, f64>,
136}
137
138impl RuntimeStatsCollector {
139    /// Create a new collector that retains at most `max_history` records.
140    ///
141    /// `max_history` is clamped to at least 1.
142    pub fn new(max_history: usize) -> Self {
143        Self {
144            history: Vec::new(),
145            max_history: max_history.max(1),
146            pattern_selectivity: HashMap::new(),
147        }
148    }
149
150    /// Record a completed query execution.
151    ///
152    /// This also updates the per-pattern selectivity estimates for every
153    /// pattern in the provided stats.
154    pub fn record(&mut self, stats: QueryExecutionStats) {
155        // Update per-pattern selectivity from the new observation.
156        for ps in &stats.pattern_stats {
157            self.update_selectivity(
158                &ps.pattern.clone(),
159                ps.actual_cardinality,
160                ps.cardinality_estimate,
161            );
162        }
163
164        // Maintain bounded history (FIFO eviction).
165        if self.history.len() >= self.max_history {
166            self.history.remove(0);
167        }
168        self.history.push(stats);
169    }
170
171    /// Update the EMA selectivity for `pattern` given a new observation.
172    ///
173    /// `selectivity = actual / max(estimated, 1)`.
174    /// The update rule is: `new_ema = alpha * observation + (1 - alpha) * old_ema`.
175    pub fn update_selectivity(&mut self, pattern: &str, actual: usize, estimated: usize) {
176        let observation = actual as f64 / estimated.max(1) as f64;
177        let entry = self
178            .pattern_selectivity
179            .entry(pattern.to_string())
180            .or_insert(observation);
181        // Apply EMA only if a prior estimate exists (otherwise just set it).
182        *entry = EMA_ALPHA * observation + (1.0 - EMA_ALPHA) * *entry;
183    }
184
185    /// Return the current EMA selectivity estimate for `pattern`.
186    ///
187    /// Returns `1.0` (no selectivity information) when the pattern has never
188    /// been observed.
189    pub fn get_selectivity(&self, pattern: &str) -> f64 {
190        self.pattern_selectivity
191            .get(pattern)
192            .copied()
193            .unwrap_or(1.0)
194    }
195
196    /// Mean query execution time across all records in the history (milliseconds).
197    ///
198    /// Returns `0.0` when the history is empty.
199    pub fn avg_query_time(&self) -> f64 {
200        if self.history.is_empty() {
201            return 0.0;
202        }
203        let sum: u64 = self.history.iter().map(|q| q.total_time_ms).sum();
204        sum as f64 / self.history.len() as f64
205    }
206
207    /// Return up to `n` references to the slowest queries, sorted by
208    /// `total_time_ms` descending.
209    pub fn slowest_queries(&self, n: usize) -> Vec<&QueryExecutionStats> {
210        let mut refs: Vec<&QueryExecutionStats> = self.history.iter().collect();
211        refs.sort_by(|a, b| b.total_time_ms.cmp(&a.total_time_ms));
212        refs.into_iter().take(n).collect()
213    }
214
215    /// Fraction of evaluations of `pattern` that were cache hits.
216    ///
217    /// Returns `0.0` when the pattern has never been observed.
218    pub fn pattern_hit_rate(&self, pattern: &str) -> f64 {
219        let (hits, total) = self
220            .history
221            .iter()
222            .flat_map(|q| &q.pattern_stats)
223            .filter(|ps| ps.pattern == pattern)
224            .fold((0usize, 0usize), |(h, t), ps| {
225                (if ps.cache_hit { h + 1 } else { h }, t + 1)
226            });
227        if total == 0 {
228            0.0
229        } else {
230            hits as f64 / total as f64
231        }
232    }
233
234    /// Total number of query records currently stored in the history.
235    pub fn total_queries(&self) -> usize {
236        self.history.len()
237    }
238
239    /// Clear all history and reset all selectivity estimates.
240    pub fn reset(&mut self) {
241        self.history.clear();
242        self.pattern_selectivity.clear();
243    }
244
245    /// Immutable access to the full history slice.
246    pub fn history(&self) -> &[QueryExecutionStats] {
247        &self.history
248    }
249
250    /// Number of distinct patterns for which selectivity data has been collected.
251    pub fn tracked_pattern_count(&self) -> usize {
252        self.pattern_selectivity.len()
253    }
254}
255
256// ---------------------------------------------------------------------------
257// Tests
258// ---------------------------------------------------------------------------
259
260#[cfg(test)]
261mod tests {
262    use super::*;
263
264    fn make_pattern_stats(
265        pattern: &str,
266        estimated: usize,
267        actual: usize,
268        ms: u64,
269        cache_hit: bool,
270    ) -> PatternStats {
271        PatternStats::new(pattern, estimated, actual, ms, cache_hit)
272    }
273
274    fn make_query_stats(id: &str, ms: u64, patterns: Vec<PatternStats>) -> QueryExecutionStats {
275        let cache_hits = patterns.iter().filter(|p| p.cache_hit).count();
276        let cache_misses = patterns.len() - cache_hits;
277        QueryExecutionStats::new(id, ms, patterns, 10, 2, 1, cache_hits, cache_misses)
278    }
279
280    // ------------------------------------------------------------------
281    // PatternStats helpers
282    // ------------------------------------------------------------------
283
284    #[test]
285    fn test_pattern_stats_construction() {
286        let ps = make_pattern_stats("?s rdf:type ?t", 100, 50, 5, false);
287        assert_eq!(ps.pattern, "?s rdf:type ?t");
288        assert_eq!(ps.cardinality_estimate, 100);
289        assert_eq!(ps.actual_cardinality, 50);
290        assert_eq!(ps.evaluation_time_ms, 5);
291        assert!(!ps.cache_hit);
292    }
293
294    #[test]
295    fn test_pattern_stats_cardinality_ratio() {
296        let ps = make_pattern_stats("?s ?p ?o", 200, 100, 10, false);
297        let ratio = ps.cardinality_ratio();
298        assert!((ratio - 0.5).abs() < 1e-10);
299    }
300
301    #[test]
302    fn test_pattern_stats_cardinality_ratio_zero_estimate() {
303        // Denominator is clamped to 1 so division by zero cannot occur.
304        let ps = make_pattern_stats("?s ?p ?o", 0, 5, 1, false);
305        let ratio = ps.cardinality_ratio();
306        assert!((ratio - 5.0).abs() < 1e-10);
307    }
308
309    #[test]
310    fn test_pattern_stats_cache_hit_flag() {
311        let ps = make_pattern_stats("?x owl:sameAs ?y", 50, 50, 0, true);
312        assert!(ps.cache_hit);
313    }
314
315    // ------------------------------------------------------------------
316    // QueryExecutionStats helpers
317    // ------------------------------------------------------------------
318
319    #[test]
320    fn test_query_stats_cache_hit_rate() {
321        let patterns = vec![
322            make_pattern_stats("p1", 10, 10, 1, true),
323            make_pattern_stats("p2", 10, 10, 1, false),
324            make_pattern_stats("p3", 10, 10, 1, true),
325            make_pattern_stats("p4", 10, 10, 1, false),
326        ];
327        let qs = make_query_stats("q1", 100, patterns);
328        let rate = qs.cache_hit_rate();
329        assert!((rate - 0.5).abs() < 1e-10);
330    }
331
332    #[test]
333    fn test_query_stats_zero_cache_rate_when_no_patterns() {
334        let qs = make_query_stats("q_empty", 10, vec![]);
335        assert_eq!(qs.cache_hit_rate(), 0.0);
336    }
337
338    // ------------------------------------------------------------------
339    // RuntimeStatsCollector – basic
340    // ------------------------------------------------------------------
341
342    #[test]
343    fn test_collector_starts_empty() {
344        let collector = RuntimeStatsCollector::new(50);
345        assert_eq!(collector.total_queries(), 0);
346        assert_eq!(collector.avg_query_time(), 0.0);
347        assert_eq!(collector.tracked_pattern_count(), 0);
348    }
349
350    #[test]
351    fn test_collector_records_single_query() {
352        let mut collector = RuntimeStatsCollector::new(50);
353        let qs = make_query_stats("q1", 120, vec![make_pattern_stats("p1", 50, 30, 10, false)]);
354        collector.record(qs);
355        assert_eq!(collector.total_queries(), 1);
356        assert_eq!(collector.avg_query_time(), 120.0);
357    }
358
359    #[test]
360    fn test_collector_avg_time_multiple_queries() {
361        let mut collector = RuntimeStatsCollector::new(50);
362        collector.record(make_query_stats("q1", 100, vec![]));
363        collector.record(make_query_stats("q2", 200, vec![]));
364        collector.record(make_query_stats("q3", 300, vec![]));
365        let avg = collector.avg_query_time();
366        assert!((avg - 200.0).abs() < 1e-10);
367    }
368
369    #[test]
370    fn test_collector_history_bounded() {
371        let max = 3;
372        let mut collector = RuntimeStatsCollector::new(max);
373        for i in 0..6u64 {
374            collector.record(make_query_stats(&format!("q{i}"), i * 10, vec![]));
375        }
376        // History must not exceed the configured maximum.
377        assert_eq!(collector.total_queries(), max);
378        // Oldest queries should have been evicted; newest are retained.
379        let ids: Vec<&str> = collector
380            .history()
381            .iter()
382            .map(|q| q.query_id.as_str())
383            .collect();
384        assert_eq!(ids, vec!["q3", "q4", "q5"]);
385    }
386
387    #[test]
388    fn test_collector_reset_clears_everything() {
389        let mut collector = RuntimeStatsCollector::new(10);
390        collector.record(make_query_stats("q1", 100, vec![]));
391        collector.update_selectivity("p1", 10, 20);
392        collector.reset();
393        assert_eq!(collector.total_queries(), 0);
394        assert_eq!(collector.tracked_pattern_count(), 0);
395        assert_eq!(collector.avg_query_time(), 0.0);
396    }
397
398    // ------------------------------------------------------------------
399    // Selectivity EMA
400    // ------------------------------------------------------------------
401
402    #[test]
403    fn test_get_selectivity_unknown_pattern_returns_one() {
404        let collector = RuntimeStatsCollector::new(10);
405        assert_eq!(collector.get_selectivity("unknown"), 1.0);
406    }
407
408    #[test]
409    fn test_update_selectivity_sets_initial_value() {
410        let mut collector = RuntimeStatsCollector::new(10);
411        // actual=10, estimated=100 → observation = 0.1
412        collector.update_selectivity("p1", 10, 100);
413        // After first update: EMA = 0.3 * 0.1 + 0.7 * 0.1 = 0.1
414        let sel = collector.get_selectivity("p1");
415        assert!((sel - 0.1).abs() < 1e-9);
416    }
417
418    #[test]
419    fn test_selectivity_ema_converges() {
420        let mut collector = RuntimeStatsCollector::new(100);
421        // Repeatedly observe that selectivity = 0.5 (actual == estimated/2).
422        for _ in 0..200 {
423            collector.update_selectivity("p_conv", 50, 100);
424        }
425        let sel = collector.get_selectivity("p_conv");
426        // After many identical observations the EMA should converge to 0.5.
427        assert!((sel - 0.5).abs() < 1e-6);
428    }
429
430    #[test]
431    fn test_selectivity_updated_on_record() {
432        let mut collector = RuntimeStatsCollector::new(50);
433        let ps = make_pattern_stats("?x :p ?y", 100, 10, 5, false);
434        collector.record(make_query_stats("q1", 50, vec![ps]));
435        // Pattern should now have a selectivity entry.
436        assert_ne!(collector.get_selectivity("?x :p ?y"), 1.0);
437    }
438
439    // ------------------------------------------------------------------
440    // slowest_queries
441    // ------------------------------------------------------------------
442
443    #[test]
444    fn test_slowest_queries_returns_descending_order() {
445        let mut collector = RuntimeStatsCollector::new(20);
446        collector.record(make_query_stats("fast", 10, vec![]));
447        collector.record(make_query_stats("slow", 500, vec![]));
448        collector.record(make_query_stats("medium", 150, vec![]));
449
450        let top2 = collector.slowest_queries(2);
451        assert_eq!(top2.len(), 2);
452        assert_eq!(top2[0].query_id, "slow");
453        assert_eq!(top2[1].query_id, "medium");
454    }
455
456    #[test]
457    fn test_slowest_queries_n_exceeds_history() {
458        let mut collector = RuntimeStatsCollector::new(10);
459        collector.record(make_query_stats("q1", 100, vec![]));
460        // Requesting more than available should return all records.
461        let result = collector.slowest_queries(50);
462        assert_eq!(result.len(), 1);
463    }
464
465    #[test]
466    fn test_slowest_queries_empty_history() {
467        let collector = RuntimeStatsCollector::new(10);
468        let result = collector.slowest_queries(5);
469        assert!(result.is_empty());
470    }
471
472    // ------------------------------------------------------------------
473    // pattern_hit_rate
474    // ------------------------------------------------------------------
475
476    #[test]
477    fn test_pattern_hit_rate_all_hits() {
478        let mut collector = RuntimeStatsCollector::new(20);
479        for _ in 0..4 {
480            collector.record(make_query_stats(
481                "q",
482                10,
483                vec![make_pattern_stats("hot_pattern", 10, 10, 0, true)],
484            ));
485        }
486        let rate = collector.pattern_hit_rate("hot_pattern");
487        assert!((rate - 1.0).abs() < 1e-10);
488    }
489
490    #[test]
491    fn test_pattern_hit_rate_no_hits() {
492        let mut collector = RuntimeStatsCollector::new(20);
493        for _ in 0..3 {
494            collector.record(make_query_stats(
495                "q",
496                10,
497                vec![make_pattern_stats("cold_pattern", 10, 10, 5, false)],
498            ));
499        }
500        assert_eq!(collector.pattern_hit_rate("cold_pattern"), 0.0);
501    }
502
503    #[test]
504    fn test_pattern_hit_rate_mixed() {
505        let mut collector = RuntimeStatsCollector::new(20);
506        // 2 hits, 2 misses → 50%
507        collector.record(make_query_stats(
508            "q1",
509            10,
510            vec![make_pattern_stats("mp", 10, 10, 1, true)],
511        ));
512        collector.record(make_query_stats(
513            "q2",
514            10,
515            vec![make_pattern_stats("mp", 10, 10, 1, false)],
516        ));
517        collector.record(make_query_stats(
518            "q3",
519            10,
520            vec![make_pattern_stats("mp", 10, 10, 1, true)],
521        ));
522        collector.record(make_query_stats(
523            "q4",
524            10,
525            vec![make_pattern_stats("mp", 10, 10, 1, false)],
526        ));
527        let rate = collector.pattern_hit_rate("mp");
528        assert!((rate - 0.5).abs() < 1e-10);
529    }
530
531    #[test]
532    fn test_pattern_hit_rate_unknown_pattern() {
533        let collector = RuntimeStatsCollector::new(10);
534        assert_eq!(collector.pattern_hit_rate("never_seen"), 0.0);
535    }
536
537    // ------------------------------------------------------------------
538    // tracked_pattern_count
539    // ------------------------------------------------------------------
540
541    #[test]
542    fn test_tracked_pattern_count_grows() {
543        let mut collector = RuntimeStatsCollector::new(20);
544        collector.update_selectivity("p1", 5, 10);
545        collector.update_selectivity("p2", 5, 10);
546        assert_eq!(collector.tracked_pattern_count(), 2);
547    }
548
549    #[test]
550    fn test_tracked_pattern_count_no_duplicate() {
551        let mut collector = RuntimeStatsCollector::new(20);
552        collector.update_selectivity("p1", 5, 10);
553        collector.update_selectivity("p1", 6, 10);
554        // Same key – must not create a duplicate entry.
555        assert_eq!(collector.tracked_pattern_count(), 1);
556    }
557}