Skip to main content

oxirs_arq/
query_profiler.rs

1//! SPARQL query profiling for performance analysis
2//!
3//! This module provides comprehensive profiling capabilities for SPARQL query execution,
4//! tracking parse time, planning time, execution time, and resource usage.
5
6use std::collections::HashMap;
7use std::time::{Duration, Instant};
8
9/// Query execution phase
10#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
11pub enum QueryPhase {
12    /// Query parsing phase
13    Parsing,
14    /// Query planning/optimization phase
15    Planning,
16    /// Query execution phase
17    Execution,
18    /// Result materialization phase
19    Materialization,
20}
21
22impl QueryPhase {
23    /// Get human-readable name
24    pub fn name(&self) -> &'static str {
25        match self {
26            QueryPhase::Parsing => "Parsing",
27            QueryPhase::Planning => "Planning",
28            QueryPhase::Execution => "Execution",
29            QueryPhase::Materialization => "Materialization",
30        }
31    }
32}
33
34/// Statistics for a single query execution
35#[derive(Debug, Clone)]
36pub struct QueryStats {
37    /// Query text or ID
38    pub query_id: String,
39    /// Total execution time
40    pub total_duration: Duration,
41    /// Time spent in each phase
42    pub phase_durations: HashMap<QueryPhase, Duration>,
43    /// Number of triples matched
44    pub triples_matched: usize,
45    /// Number of results returned
46    pub results_count: usize,
47    /// Peak memory usage (bytes)
48    pub peak_memory: usize,
49    /// Number of joins performed
50    pub joins_performed: usize,
51    /// Cache hit rate (0.0 - 1.0)
52    pub cache_hit_rate: f64,
53    /// Start timestamp
54    pub start_time: Option<Instant>,
55    /// End timestamp
56    pub end_time: Option<Instant>,
57}
58
59impl QueryStats {
60    /// Create new query statistics
61    pub fn new(query_id: String) -> Self {
62        Self {
63            query_id,
64            total_duration: Duration::default(),
65            phase_durations: HashMap::new(),
66            triples_matched: 0,
67            results_count: 0,
68            peak_memory: 0,
69            joins_performed: 0,
70            cache_hit_rate: 0.0,
71            start_time: None,
72            end_time: None,
73        }
74    }
75
76    /// Get duration for a specific phase
77    pub fn phase_duration(&self, phase: QueryPhase) -> Duration {
78        self.phase_durations
79            .get(&phase)
80            .copied()
81            .unwrap_or_default()
82    }
83
84    /// Get throughput (results per second)
85    pub fn throughput(&self) -> f64 {
86        if self.total_duration.as_secs_f64() > 0.0 {
87            self.results_count as f64 / self.total_duration.as_secs_f64()
88        } else {
89            0.0
90        }
91    }
92
93    /// Get MB processed per second
94    pub fn mb_per_second(&self) -> f64 {
95        let mb = self.peak_memory as f64 / (1024.0 * 1024.0);
96        if self.total_duration.as_secs_f64() > 0.0 {
97            mb / self.total_duration.as_secs_f64()
98        } else {
99            0.0
100        }
101    }
102
103    /// Generate human-readable report
104    pub fn report(&self) -> String {
105        let mut lines = vec![
106            format!("Query Execution Statistics: {}", self.query_id),
107            format!(
108                "  Total Duration: {:.3}s",
109                self.total_duration.as_secs_f64()
110            ),
111        ];
112
113        // Phase breakdowns
114        for phase in &[
115            QueryPhase::Parsing,
116            QueryPhase::Planning,
117            QueryPhase::Execution,
118            QueryPhase::Materialization,
119        ] {
120            if let Some(duration) = self.phase_durations.get(phase) {
121                let percentage = if self.total_duration.as_secs_f64() > 0.0 {
122                    (duration.as_secs_f64() / self.total_duration.as_secs_f64()) * 100.0
123                } else {
124                    0.0
125                };
126                lines.push(format!(
127                    "    {}: {:.3}s ({:.1}%)",
128                    phase.name(),
129                    duration.as_secs_f64(),
130                    percentage
131                ));
132            }
133        }
134
135        lines.extend(vec![
136            format!("  Triples Matched: {}", self.triples_matched),
137            format!("  Results: {}", self.results_count),
138            format!("  Joins: {}", self.joins_performed),
139            format!("  Cache Hit Rate: {:.1}%", self.cache_hit_rate * 100.0),
140            format!(
141                "  Peak Memory: {:.2} MB",
142                self.peak_memory as f64 / (1024.0 * 1024.0)
143            ),
144            format!("  Throughput: {:.0} results/s", self.throughput()),
145        ]);
146
147        lines.join("\n")
148    }
149}
150
151/// SPARQL query profiler
152pub struct QueryProfiler {
153    /// Current query being profiled
154    current_query: Option<QueryStats>,
155    /// Phase start times
156    phase_start: HashMap<QueryPhase, Instant>,
157    /// Whether profiling is enabled
158    enabled: bool,
159    /// History of profiled queries (limited to last N)
160    history: Vec<QueryStats>,
161    /// Maximum history size
162    max_history: usize,
163}
164
165impl QueryProfiler {
166    /// Create a new query profiler
167    pub fn new() -> Self {
168        Self {
169            current_query: None,
170            phase_start: HashMap::new(),
171            enabled: true,
172            history: Vec::new(),
173            max_history: 100,
174        }
175    }
176
177    /// Create a disabled profiler (no overhead)
178    pub fn disabled() -> Self {
179        Self {
180            current_query: None,
181            phase_start: HashMap::new(),
182            enabled: false,
183            history: Vec::new(),
184            max_history: 0,
185        }
186    }
187
188    /// Enable profiling
189    pub fn enable(&mut self) {
190        self.enabled = true;
191    }
192
193    /// Disable profiling
194    pub fn disable(&mut self) {
195        self.enabled = false;
196    }
197
198    /// Check if profiling is enabled
199    pub fn is_enabled(&self) -> bool {
200        self.enabled
201    }
202
203    /// Start profiling a query
204    pub fn start_query(&mut self, query_id: String) {
205        if !self.enabled {
206            return;
207        }
208
209        let mut stats = QueryStats::new(query_id);
210        stats.start_time = Some(Instant::now());
211        self.current_query = Some(stats);
212    }
213
214    /// Start a query phase
215    pub fn start_phase(&mut self, phase: QueryPhase) {
216        if !self.enabled || self.current_query.is_none() {
217            return;
218        }
219
220        self.phase_start.insert(phase, Instant::now());
221    }
222
223    /// End a query phase
224    pub fn end_phase(&mut self, phase: QueryPhase) {
225        if !self.enabled || self.current_query.is_none() {
226            return;
227        }
228
229        if let Some(start) = self.phase_start.remove(&phase) {
230            let duration = start.elapsed();
231            if let Some(ref mut stats) = self.current_query {
232                stats.phase_durations.insert(phase, duration);
233            }
234        }
235    }
236
237    /// Record triples matched
238    pub fn record_triples(&mut self, count: usize) {
239        if let Some(ref mut stats) = self.current_query {
240            stats.triples_matched += count;
241        }
242    }
243
244    /// Record results count
245    pub fn record_results(&mut self, count: usize) {
246        if let Some(ref mut stats) = self.current_query {
247            stats.results_count = count;
248        }
249    }
250
251    /// Record memory usage
252    pub fn record_memory(&mut self, bytes: usize) {
253        if let Some(ref mut stats) = self.current_query {
254            stats.peak_memory = stats.peak_memory.max(bytes);
255        }
256    }
257
258    /// Record join operation
259    pub fn record_join(&mut self) {
260        if let Some(ref mut stats) = self.current_query {
261            stats.joins_performed += 1;
262        }
263    }
264
265    /// Record cache hit rate
266    pub fn record_cache_hit_rate(&mut self, rate: f64) {
267        if let Some(ref mut stats) = self.current_query {
268            stats.cache_hit_rate = rate.clamp(0.0, 1.0);
269        }
270    }
271
272    /// End profiling current query
273    pub fn end_query(&mut self) -> Option<QueryStats> {
274        if !self.enabled {
275            return None;
276        }
277
278        if let Some(mut stats) = self.current_query.take() {
279            stats.end_time = Some(Instant::now());
280            if let (Some(start), Some(end)) = (stats.start_time, stats.end_time) {
281                stats.total_duration = end.duration_since(start);
282            }
283
284            // Add to history
285            if self.history.len() >= self.max_history {
286                self.history.remove(0);
287            }
288            self.history.push(stats.clone());
289
290            Some(stats)
291        } else {
292            None
293        }
294    }
295
296    /// Get current query statistics
297    pub fn current_stats(&self) -> Option<&QueryStats> {
298        self.current_query.as_ref()
299    }
300
301    /// Get query history
302    pub fn history(&self) -> &[QueryStats] {
303        &self.history
304    }
305
306    /// Clear history
307    pub fn clear_history(&mut self) {
308        self.history.clear();
309    }
310
311    /// Get average statistics across all queries in history
312    pub fn average_stats(&self) -> Option<AverageStats> {
313        if self.history.is_empty() {
314            return None;
315        }
316
317        let count = self.history.len() as f64;
318        let total_duration: Duration = self.history.iter().map(|s| s.total_duration).sum();
319        let avg_triples = self
320            .history
321            .iter()
322            .map(|s| s.triples_matched)
323            .sum::<usize>() as f64
324            / count;
325        let avg_results =
326            self.history.iter().map(|s| s.results_count).sum::<usize>() as f64 / count;
327        let avg_joins = self
328            .history
329            .iter()
330            .map(|s| s.joins_performed)
331            .sum::<usize>() as f64
332            / count;
333        let avg_cache_hit = self.history.iter().map(|s| s.cache_hit_rate).sum::<f64>() / count;
334        let avg_memory = self.history.iter().map(|s| s.peak_memory).sum::<usize>() as f64 / count;
335
336        Some(AverageStats {
337            query_count: self.history.len(),
338            avg_duration: Duration::from_secs_f64(total_duration.as_secs_f64() / count),
339            avg_triples_matched: avg_triples,
340            avg_results_count: avg_results,
341            avg_joins_performed: avg_joins,
342            avg_cache_hit_rate: avg_cache_hit,
343            avg_peak_memory: avg_memory,
344        })
345    }
346
347    /// Generate summary report
348    pub fn summary_report(&self) -> String {
349        if self.history.is_empty() {
350            return "No query history available".to_string();
351        }
352
353        let mut lines = vec![format!(
354            "Query Profiler Summary ({} queries)",
355            self.history.len()
356        )];
357
358        if let Some(avg) = self.average_stats() {
359            lines.push(avg.report());
360        }
361
362        lines.join("\n")
363    }
364}
365
366impl Default for QueryProfiler {
367    fn default() -> Self {
368        Self::new()
369    }
370}
371
372/// Average statistics across multiple queries
373#[derive(Debug, Clone)]
374pub struct AverageStats {
375    /// Number of queries
376    pub query_count: usize,
377    /// Average duration
378    pub avg_duration: Duration,
379    /// Average triples matched
380    pub avg_triples_matched: f64,
381    /// Average results count
382    pub avg_results_count: f64,
383    /// Average joins performed
384    pub avg_joins_performed: f64,
385    /// Average cache hit rate
386    pub avg_cache_hit_rate: f64,
387    /// Average peak memory
388    pub avg_peak_memory: f64,
389}
390
391impl AverageStats {
392    /// Generate report
393    pub fn report(&self) -> String {
394        format!(
395            "Average Statistics:\n\
396             - Queries: {}\n\
397             - Duration: {:.3}s\n\
398             - Triples Matched: {:.0}\n\
399             - Results: {:.0}\n\
400             - Joins: {:.1}\n\
401             - Cache Hit Rate: {:.1}%\n\
402             - Peak Memory: {:.2} MB",
403            self.query_count,
404            self.avg_duration.as_secs_f64(),
405            self.avg_triples_matched,
406            self.avg_results_count,
407            self.avg_joins_performed,
408            self.avg_cache_hit_rate * 100.0,
409            self.avg_peak_memory / (1024.0 * 1024.0)
410        )
411    }
412}
413
414#[cfg(test)]
415mod tests {
416    use super::*;
417    use std::thread;
418    use std::time::Duration;
419
420    #[test]
421    fn test_query_stats_creation() {
422        let stats = QueryStats::new("test_query".to_string());
423        assert_eq!(stats.query_id, "test_query");
424        assert_eq!(stats.triples_matched, 0);
425        assert_eq!(stats.results_count, 0);
426    }
427
428    #[test]
429    fn test_query_phase_duration() {
430        let mut stats = QueryStats::new("test".to_string());
431        stats
432            .phase_durations
433            .insert(QueryPhase::Parsing, Duration::from_millis(100));
434        stats
435            .phase_durations
436            .insert(QueryPhase::Execution, Duration::from_millis(500));
437
438        assert_eq!(stats.phase_duration(QueryPhase::Parsing).as_millis(), 100);
439        assert_eq!(stats.phase_duration(QueryPhase::Execution).as_millis(), 500);
440        assert_eq!(stats.phase_duration(QueryPhase::Planning).as_millis(), 0);
441    }
442
443    #[test]
444    fn test_profiler_basic() {
445        let mut profiler = QueryProfiler::new();
446        assert!(profiler.is_enabled());
447
448        profiler.start_query("SELECT * WHERE { ?s ?p ?o }".to_string());
449        thread::sleep(Duration::from_millis(10));
450
451        profiler.record_triples(100);
452        profiler.record_results(50);
453        profiler.record_join();
454
455        let stats = profiler.end_query().unwrap();
456        assert_eq!(stats.triples_matched, 100);
457        assert_eq!(stats.results_count, 50);
458        assert_eq!(stats.joins_performed, 1);
459        assert!(stats.total_duration.as_millis() >= 10);
460    }
461
462    #[test]
463    fn test_profiler_phases() {
464        let mut profiler = QueryProfiler::new();
465
466        profiler.start_query("test".to_string());
467
468        profiler.start_phase(QueryPhase::Parsing);
469        thread::sleep(Duration::from_millis(10));
470        profiler.end_phase(QueryPhase::Parsing);
471
472        profiler.start_phase(QueryPhase::Execution);
473        thread::sleep(Duration::from_millis(20));
474        profiler.end_phase(QueryPhase::Execution);
475
476        let stats = profiler.end_query().unwrap();
477        assert!(stats.phase_duration(QueryPhase::Parsing).as_millis() >= 10);
478        assert!(stats.phase_duration(QueryPhase::Execution).as_millis() >= 20);
479    }
480
481    #[test]
482    fn test_disabled_profiler() {
483        let mut profiler = QueryProfiler::disabled();
484        assert!(!profiler.is_enabled());
485
486        profiler.start_query("test".to_string());
487        profiler.record_triples(100);
488
489        let stats = profiler.end_query();
490        assert!(stats.is_none());
491    }
492
493    #[test]
494    fn test_profiler_history() {
495        let mut profiler = QueryProfiler::new();
496
497        for i in 0..5 {
498            profiler.start_query(format!("query_{}", i));
499            profiler.record_results(i * 10);
500            profiler.end_query();
501        }
502
503        assert_eq!(profiler.history().len(), 5);
504        assert_eq!(profiler.history()[0].results_count, 0);
505        assert_eq!(profiler.history()[4].results_count, 40);
506    }
507
508    #[test]
509    fn test_average_stats() {
510        let mut profiler = QueryProfiler::new();
511
512        for i in 1..=3 {
513            profiler.start_query(format!("query_{}", i));
514            profiler.record_triples(i * 100);
515            profiler.record_results(i * 10);
516            profiler.end_query();
517        }
518
519        let avg = profiler.average_stats().unwrap();
520        assert_eq!(avg.query_count, 3);
521        assert_eq!(avg.avg_triples_matched, 200.0); // (100 + 200 + 300) / 3
522        assert_eq!(avg.avg_results_count, 20.0); // (10 + 20 + 30) / 3
523    }
524
525    #[test]
526    fn test_throughput() {
527        let mut stats = QueryStats::new("test".to_string());
528        stats.results_count = 1000;
529        stats.total_duration = Duration::from_secs(2);
530
531        assert_eq!(stats.throughput(), 500.0);
532    }
533
534    #[test]
535    fn test_cache_hit_rate_clamping() {
536        let mut profiler = QueryProfiler::new();
537        profiler.start_query("test".to_string());
538
539        profiler.record_cache_hit_rate(1.5); // Should clamp to 1.0
540        let stats = profiler.end_query().unwrap();
541        assert_eq!(stats.cache_hit_rate, 1.0);
542    }
543}