1use crate::OxirsError;
31use parking_lot::RwLock;
32use scirs2_core::metrics::{Counter, Histogram, MetricsRegistry, Timer};
33use serde::{Deserialize, Serialize};
34use std::collections::HashMap;
35use std::sync::Arc;
36use std::time::Instant;
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct ProfilerConfig {
41 pub enable_detailed: bool,
43
44 pub track_memory: bool,
46
47 pub profile_patterns: bool,
49
50 pub profile_joins: bool,
52
53 pub profile_indexes: bool,
55
56 pub max_history: usize,
58
59 pub slow_query_threshold_ms: u64,
61
62 pub sample_rate: f32,
64}
65
66impl Default for ProfilerConfig {
67 fn default() -> Self {
68 Self {
69 enable_detailed: true,
70 track_memory: true,
71 profile_patterns: true,
72 profile_joins: true,
73 profile_indexes: true,
74 max_history: 1000,
75 slow_query_threshold_ms: 1000,
76 sample_rate: 1.0,
77 }
78 }
79}
80
81#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct QueryStatistics {
84 pub total_time_ms: u64,
86
87 pub parse_time_ms: u64,
89
90 pub planning_time_ms: u64,
92
93 pub execution_time_ms: u64,
95
96 pub triples_matched: u64,
98
99 pub results_count: u64,
101
102 pub peak_memory_bytes: u64,
104
105 pub pattern_matches: HashMap<String, u64>,
107
108 pub index_accesses: HashMap<String, u64>,
110
111 pub join_operations: u64,
113
114 pub cache_hit_rate: f32,
116
117 pub plan_hash: u64,
119
120 pub timestamp: u64,
122}
123
124impl Default for QueryStatistics {
125 fn default() -> Self {
126 Self {
127 total_time_ms: 0,
128 parse_time_ms: 0,
129 planning_time_ms: 0,
130 execution_time_ms: 0,
131 triples_matched: 0,
132 results_count: 0,
133 peak_memory_bytes: 0,
134 pattern_matches: HashMap::new(),
135 index_accesses: HashMap::new(),
136 join_operations: 0,
137 cache_hit_rate: 0.0,
138 plan_hash: 0,
139 timestamp: 0,
140 }
141 }
142}
143
144#[derive(Debug, Clone, Serialize, Deserialize)]
146pub struct ProfilingStatistics {
147 pub total_queries: u64,
149
150 pub avg_execution_time_ms: f64,
152
153 pub median_execution_time_ms: f64,
155
156 pub p95_execution_time_ms: f64,
158
159 pub p99_execution_time_ms: f64,
161
162 pub max_execution_time_ms: u64,
164
165 pub min_execution_time_ms: u64,
167
168 pub total_triples_matched: u64,
170
171 pub avg_triples_per_query: f64,
173
174 pub top_patterns: Vec<(String, u64)>,
176
177 pub top_indexes: Vec<(String, u64)>,
179
180 pub overall_cache_hit_rate: f32,
182
183 pub slow_query_count: u64,
185}
186
187impl Default for ProfilingStatistics {
188 fn default() -> Self {
189 Self {
190 total_queries: 0,
191 avg_execution_time_ms: 0.0,
192 median_execution_time_ms: 0.0,
193 p95_execution_time_ms: 0.0,
194 p99_execution_time_ms: 0.0,
195 max_execution_time_ms: 0,
196 min_execution_time_ms: u64::MAX,
197 total_triples_matched: 0,
198 avg_triples_per_query: 0.0,
199 top_patterns: Vec::new(),
200 top_indexes: Vec::new(),
201 overall_cache_hit_rate: 0.0,
202 slow_query_count: 0,
203 }
204 }
205}
206
207#[derive(Debug, Clone, Serialize, Deserialize)]
209pub struct ProfiledQuery {
210 pub query_text: String,
212
213 pub statistics: QueryStatistics,
215
216 pub query_type: String,
218
219 pub is_slow: bool,
221
222 pub optimization_hints: Vec<String>,
224}
225
226pub struct QueryProfilingSession {
228 #[allow(dead_code)]
230 query_text: String,
231
232 start_time: Instant,
234
235 statistics: QueryStatistics,
237
238 timers: HashMap<String, Instant>,
240
241 config: ProfilerConfig,
243
244 #[allow(dead_code)]
246 session_id: String,
247}
248
249impl QueryProfilingSession {
250 pub fn start_phase(&mut self, phase: &str) {
252 self.timers.insert(phase.to_string(), Instant::now());
253 }
254
255 pub fn end_phase(&mut self, phase: &str) {
257 if let Some(start) = self.timers.remove(phase) {
258 let duration = start.elapsed();
259 let duration_ms = duration.as_millis() as u64;
260
261 match phase {
262 "parse" => self.statistics.parse_time_ms = duration_ms,
263 "planning" => self.statistics.planning_time_ms = duration_ms,
264 "execution" => self.statistics.execution_time_ms = duration_ms,
265 _ => {}
266 }
267 }
268 }
269
270 pub fn record_pattern(&mut self, pattern: String) {
272 if self.config.profile_patterns {
273 *self.statistics.pattern_matches.entry(pattern).or_insert(0) += 1;
274 }
275 }
276
277 pub fn record_index_access(&mut self, index_name: String) {
279 if self.config.profile_indexes {
280 *self
281 .statistics
282 .index_accesses
283 .entry(index_name)
284 .or_insert(0) += 1;
285 }
286 }
287
288 pub fn record_join(&mut self) {
290 if self.config.profile_joins {
291 self.statistics.join_operations += 1;
292 }
293 }
294
295 pub fn record_triples_matched(&mut self, count: u64) {
297 self.statistics.triples_matched += count;
298 }
299
300 pub fn record_results(&mut self, count: u64) {
302 self.statistics.results_count = count;
303 }
304
305 pub fn record_cache_access(&mut self, hit: bool) {
307 let total = self.statistics.triples_matched as f32;
309 if total > 0.0 {
310 let hits = if hit { 1.0 } else { 0.0 };
311 self.statistics.cache_hit_rate =
312 (self.statistics.cache_hit_rate * (total - 1.0) + hits) / total;
313 }
314 }
315
316 pub fn set_plan_hash(&mut self, hash: u64) {
318 self.statistics.plan_hash = hash;
319 }
320
321 pub fn finish(mut self) -> QueryStatistics {
323 let total_duration = self.start_time.elapsed();
324 self.statistics.total_time_ms = total_duration.as_millis() as u64;
325 self.statistics.timestamp = std::time::SystemTime::now()
326 .duration_since(std::time::UNIX_EPOCH)
327 .unwrap_or_default()
328 .as_secs();
329
330 if self.config.track_memory {
332 #[cfg(target_os = "linux")]
335 {
336 if let Ok(status) = std::fs::read_to_string("/proc/self/status") {
337 for line in status.lines() {
338 if line.starts_with("VmRSS:") {
339 if let Some(kb_str) = line.split_whitespace().nth(1) {
340 if let Ok(kb) = kb_str.parse::<u64>() {
341 self.statistics.peak_memory_bytes = kb * 1024;
342 }
343 }
344 break;
345 }
346 }
347 }
348 }
349
350 #[cfg(target_os = "macos")]
351 {
352 use std::mem;
354 extern "C" {
355 fn mach_task_self() -> u32;
356 fn task_info(
357 task: u32,
358 flavor: u32,
359 task_info: *mut u8,
360 count: *mut u32,
361 ) -> i32;
362 }
363
364 const MACH_TASK_BASIC_INFO: u32 = 20;
365 const MACH_TASK_BASIC_INFO_COUNT: u32 = 10;
366
367 #[repr(C)]
368 struct MachTaskBasicInfo {
369 virtual_size: u64,
370 resident_size: u64,
371 }
373
374 unsafe {
375 let mut info: MachTaskBasicInfo = mem::zeroed();
376 let mut count = MACH_TASK_BASIC_INFO_COUNT;
377 let result = task_info(
378 mach_task_self(),
379 MACH_TASK_BASIC_INFO,
380 &mut info as *mut _ as *mut u8,
381 &mut count,
382 );
383 if result == 0 {
384 self.statistics.peak_memory_bytes = info.resident_size;
385 }
386 }
387 }
388
389 #[cfg(target_os = "windows")]
390 {
391 use std::mem;
393 #[repr(C)]
394 struct ProcessMemoryCounters {
395 cb: u32,
396 page_fault_count: u32,
397 peak_working_set_size: usize,
398 working_set_size: usize,
399 quota_peak_paged_pool_usage: usize,
400 quota_paged_pool_usage: usize,
401 quota_peak_non_paged_pool_usage: usize,
402 quota_non_paged_pool_usage: usize,
403 pagefile_usage: usize,
404 peak_pagefile_usage: usize,
405 }
406
407 extern "system" {
408 fn GetCurrentProcess() -> *mut std::ffi::c_void;
409 fn K32GetProcessMemoryInfo(
410 process: *mut std::ffi::c_void,
411 counters: *mut ProcessMemoryCounters,
412 cb: u32,
413 ) -> i32;
414 }
415
416 unsafe {
417 let mut counters: ProcessMemoryCounters = mem::zeroed();
418 counters.cb = mem::size_of::<ProcessMemoryCounters>() as u32;
419 let result =
420 K32GetProcessMemoryInfo(GetCurrentProcess(), &mut counters, counters.cb);
421 if result != 0 {
422 self.statistics.peak_memory_bytes = counters.working_set_size as u64;
423 }
424 }
425 }
426 }
427
428 self.statistics
429 }
430}
431
432pub struct QueryProfiler {
434 config: ProfilerConfig,
436
437 history: Arc<RwLock<Vec<ProfiledQuery>>>,
439
440 #[allow(dead_code)]
442 metrics: Arc<MetricsRegistry>,
443
444 query_timer: Arc<Timer>,
446
447 query_counter: Arc<Counter>,
449
450 triples_histogram: Arc<Histogram>,
452}
453
454impl QueryProfiler {
455 pub fn new(config: ProfilerConfig) -> Self {
457 let metrics = Arc::new(MetricsRegistry::new());
458
459 let query_timer = Arc::new(Timer::new("query_execution_time".to_string()));
460 let query_counter = Arc::new(Counter::new("total_queries".to_string()));
461 let triples_histogram = Arc::new(Histogram::new("triples_matched".to_string()));
462
463 Self {
464 config,
465 history: Arc::new(RwLock::new(Vec::new())),
466 metrics,
467 query_timer,
468 query_counter,
469 triples_histogram,
470 }
471 }
472
473 pub fn start_session(&self, query_text: &str) -> QueryProfilingSession {
475 self.query_counter.inc();
477
478 let session_id = format!("query_{}", fastrand::u64(..));
480
481 QueryProfilingSession {
482 query_text: query_text.to_string(),
483 start_time: Instant::now(),
484 statistics: QueryStatistics::default(),
485 timers: HashMap::new(),
486 config: self.config.clone(),
487 session_id,
488 }
489 }
490
491 pub fn record_query(
493 &self,
494 query_text: String,
495 statistics: QueryStatistics,
496 query_type: String,
497 ) {
498 self.query_timer
500 .observe(std::time::Duration::from_millis(statistics.total_time_ms));
501 self.triples_histogram
502 .observe(statistics.triples_matched as f64);
503
504 let is_slow = statistics.total_time_ms >= self.config.slow_query_threshold_ms;
506
507 let optimization_hints = self.identify_optimization_hints(&statistics);
509
510 let profiled = ProfiledQuery {
511 query_text,
512 statistics,
513 query_type,
514 is_slow,
515 optimization_hints,
516 };
517
518 let mut history = self.history.write();
520 history.push(profiled);
521
522 if history.len() > self.config.max_history {
524 history.remove(0);
525 }
526 }
527
528 pub fn get_statistics(&self) -> ProfilingStatistics {
530 let history = self.history.read();
531
532 if history.is_empty() {
533 return ProfilingStatistics::default();
534 }
535
536 let mut times: Vec<u64> = history.iter().map(|q| q.statistics.total_time_ms).collect();
537 times.sort_unstable();
538
539 let total_queries = history.len() as u64;
540 let sum_time: u64 = times.iter().sum();
541 let avg_time = sum_time as f64 / total_queries as f64;
542
543 let median = times[times.len() / 2];
544 let p95_idx = (times.len() as f64 * 0.95) as usize;
545 let p99_idx = (times.len() as f64 * 0.99) as usize;
546 let p95 = times.get(p95_idx).copied().unwrap_or(0);
547 let p99 = times.get(p99_idx).copied().unwrap_or(0);
548
549 let total_triples: u64 = history.iter().map(|q| q.statistics.triples_matched).sum();
550 let avg_triples = total_triples as f64 / total_queries as f64;
551
552 let slow_count = history.iter().filter(|q| q.is_slow).count() as u64;
553
554 let mut pattern_counts: HashMap<String, u64> = HashMap::new();
556 for query in history.iter() {
557 for (pattern, count) in &query.statistics.pattern_matches {
558 *pattern_counts.entry(pattern.clone()).or_insert(0) += count;
559 }
560 }
561 let mut top_patterns: Vec<_> = pattern_counts.into_iter().collect();
562 top_patterns.sort_by_key(|(_, count)| std::cmp::Reverse(*count));
563 top_patterns.truncate(10);
564
565 let mut index_counts: HashMap<String, u64> = HashMap::new();
567 for query in history.iter() {
568 for (index, count) in &query.statistics.index_accesses {
569 *index_counts.entry(index.clone()).or_insert(0) += count;
570 }
571 }
572 let mut top_indexes: Vec<_> = index_counts.into_iter().collect();
573 top_indexes.sort_by_key(|(_, count)| std::cmp::Reverse(*count));
574 top_indexes.truncate(10);
575
576 let total_cache_hits: f32 = history.iter().map(|q| q.statistics.cache_hit_rate).sum();
578 let overall_cache_hit_rate = total_cache_hits / total_queries as f32;
579
580 ProfilingStatistics {
581 total_queries,
582 avg_execution_time_ms: avg_time,
583 median_execution_time_ms: median as f64,
584 p95_execution_time_ms: p95 as f64,
585 p99_execution_time_ms: p99 as f64,
586 max_execution_time_ms: *times.last().unwrap_or(&0),
587 min_execution_time_ms: *times.first().unwrap_or(&0),
588 total_triples_matched: total_triples,
589 avg_triples_per_query: avg_triples,
590 top_patterns,
591 top_indexes,
592 overall_cache_hit_rate,
593 slow_query_count: slow_count,
594 }
595 }
596
597 pub fn get_slow_queries(&self, limit: usize) -> Vec<ProfiledQuery> {
599 let history = self.history.read();
600 history
601 .iter()
602 .filter(|q| q.is_slow)
603 .rev()
604 .take(limit)
605 .cloned()
606 .collect()
607 }
608
609 pub fn clear_history(&self) {
611 self.history.write().clear();
612 }
613
614 pub fn export_json(&self) -> Result<String, OxirsError> {
616 let stats = self.get_statistics();
617 serde_json::to_string_pretty(&stats).map_err(|e| {
618 OxirsError::Serialize(format!("Failed to serialize profiling data: {}", e))
619 })
620 }
621
622 fn identify_optimization_hints(&self, stats: &QueryStatistics) -> Vec<String> {
624 let mut hints = Vec::new();
625
626 if stats.execution_time_ms > self.config.slow_query_threshold_ms {
628 hints.push(format!(
629 "â ī¸ Slow execution ({}ms) - consider adding indexes or optimizing patterns",
630 stats.execution_time_ms
631 ));
632
633 if stats.parse_time_ms > stats.execution_time_ms / 4 {
635 hints.push(format!(
636 "đĄ High parse time ({}ms, {:.1}% of total) - consider caching parsed queries",
637 stats.parse_time_ms,
638 (stats.parse_time_ms as f64 / stats.total_time_ms as f64) * 100.0
639 ));
640 }
641
642 if stats.planning_time_ms > stats.execution_time_ms / 4 {
644 hints.push(format!(
645 "đĄ High planning time ({}ms, {:.1}% of total) - enable query plan caching",
646 stats.planning_time_ms,
647 (stats.planning_time_ms as f64 / stats.total_time_ms as f64) * 100.0
648 ));
649 }
650 }
651
652 if stats.cache_hit_rate < 0.5 {
654 hints.push(format!(
655 "đž Low cache hit rate ({:.1}%) - query may benefit from result caching",
656 stats.cache_hit_rate * 100.0
657 ));
658 } else if stats.cache_hit_rate > 0.9 {
659 hints.push(format!(
660 "â
Excellent cache hit rate ({:.1}%) - caching is working well",
661 stats.cache_hit_rate * 100.0
662 ));
663 }
664
665 if stats.join_operations > 10 {
667 hints.push(format!(
668 "đ Many join operations ({}) - consider reordering patterns for better selectivity",
669 stats.join_operations
670 ));
671
672 if stats.join_operations > 20 {
674 hints.push(
675 "đĄ Excessive joins - break query into smaller subqueries or use UNION instead"
676 .to_string(),
677 );
678 }
679 }
680
681 if stats.triples_matched > 10000 && stats.results_count < 100 {
683 let selectivity = stats.results_count as f64 / stats.triples_matched as f64;
684 hints.push(format!(
685 "đ¯ High selectivity gap (matched {} triples, returned {} results, {:.3}% selectivity) - add more selective patterns early",
686 stats.triples_matched, stats.results_count, selectivity * 100.0
687 ));
688 }
689
690 if !stats.pattern_matches.is_empty() {
692 if let Some((pattern, count)) = stats.pattern_matches.iter().max_by_key(|(_, c)| *c) {
694 if *count > stats.pattern_matches.len() as u64 * 2 {
695 hints.push(format!(
696 "đ Pattern '{}' heavily used ({} times) - ensure it has appropriate index",
697 pattern, count
698 ));
699 }
700 }
701 }
702
703 if !stats.index_accesses.is_empty() {
705 let total_accesses: u64 = stats.index_accesses.values().sum();
706 if total_accesses > 1000 {
707 hints.push(format!(
708 "đī¸ High index access count ({}) - consider index consolidation or query simplification",
709 total_accesses
710 ));
711 }
712
713 if stats.pattern_matches.len() > stats.index_accesses.len() {
715 hints.push(
716 "đĄ Some patterns may not be using indexes - review query structure"
717 .to_string(),
718 );
719 }
720 }
721
722 if stats.peak_memory_bytes > 100 * 1024 * 1024 {
724 hints.push(format!(
726 "đž High memory usage ({:.1}MB) - consider streaming results or pagination",
727 stats.peak_memory_bytes as f64 / (1024.0 * 1024.0)
728 ));
729 }
730
731 if stats.results_count == 0 {
733 hints.push(
734 "âšī¸ Query returned no results - verify query logic and data availability"
735 .to_string(),
736 );
737 } else if stats.results_count > 10000 {
738 hints.push(format!(
739 "đ Large result set ({} results) - consider adding LIMIT clause or pagination",
740 stats.results_count
741 ));
742 }
743
744 let efficiency_score = if stats.triples_matched > 0 {
746 (stats.results_count as f64 / stats.triples_matched as f64) * 1000.0
747 / stats.total_time_ms as f64
748 } else {
749 0.0
750 };
751
752 if efficiency_score < 0.1 && stats.results_count > 0 {
753 hints.push(
754 "⥠Low query efficiency - review overall query structure and indexing strategy"
755 .to_string(),
756 );
757 }
758
759 hints
760 }
761}
762
763#[cfg(test)]
764mod tests {
765 use super::*;
766
767 #[test]
768 fn test_profiler_creation() {
769 let config = ProfilerConfig::default();
770 let profiler = QueryProfiler::new(config);
771
772 let stats = profiler.get_statistics();
773 assert_eq!(stats.total_queries, 0);
774 }
775
776 #[test]
777 fn test_session_lifecycle() {
778 let config = ProfilerConfig::default();
779 let profiler = QueryProfiler::new(config);
780
781 let mut session = profiler.start_session("SELECT * WHERE { ?s ?p ?o }");
782
783 session.start_phase("parse");
784 std::thread::sleep(std::time::Duration::from_millis(10));
785 session.end_phase("parse");
786
787 session.start_phase("planning");
788 std::thread::sleep(std::time::Duration::from_millis(10));
789 session.end_phase("planning");
790
791 session.start_phase("execution");
792 session.record_triples_matched(100);
793 session.record_results(10);
794 std::thread::sleep(std::time::Duration::from_millis(10));
795 session.end_phase("execution");
796
797 let stats = session.finish();
798
799 assert!(stats.total_time_ms >= 30);
800 assert_eq!(stats.triples_matched, 100);
801 assert_eq!(stats.results_count, 10);
802 }
803
804 #[test]
805 fn test_pattern_recording() {
806 let config = ProfilerConfig::default();
807 let profiler = QueryProfiler::new(config);
808
809 let mut session = profiler.start_session("SELECT * WHERE { ?s ?p ?o }");
810
811 session.record_pattern("SPO".to_string());
812 session.record_pattern("SPO".to_string());
813 session.record_pattern("POS".to_string());
814
815 let stats = session.finish();
816
817 assert_eq!(stats.pattern_matches.get("SPO"), Some(&2));
818 assert_eq!(stats.pattern_matches.get("POS"), Some(&1));
819 }
820
821 #[test]
822 fn test_optimization_hints() {
823 let config = ProfilerConfig {
824 slow_query_threshold_ms: 100,
825 ..Default::default()
826 };
827 let profiler = QueryProfiler::new(config);
828
829 let stats = QueryStatistics {
830 total_time_ms: 200,
831 execution_time_ms: 200,
832 cache_hit_rate: 0.3,
833 join_operations: 15,
834 triples_matched: 50000,
835 results_count: 50,
836 ..Default::default()
837 };
838
839 let hints = profiler.identify_optimization_hints(&stats);
840
841 assert!(!hints.is_empty());
842 assert!(hints.iter().any(|h| h.contains("Slow execution")));
843 assert!(hints.iter().any(|h| h.contains("Low cache hit rate")));
844 assert!(hints.iter().any(|h| h.contains("Many join operations")));
845 assert!(hints.iter().any(|h| h.contains("High selectivity gap")));
846 }
847
848 #[test]
849 fn test_statistics_aggregation() {
850 let config = ProfilerConfig::default();
851 let profiler = QueryProfiler::new(config);
852
853 for i in 0..10 {
855 let stats = QueryStatistics {
856 total_time_ms: 100 + i * 10,
857 triples_matched: 1000 + i * 100,
858 ..Default::default()
859 };
860
861 profiler.record_query(format!("Query {}", i), stats, "SELECT".to_string());
862 }
863
864 let agg_stats = profiler.get_statistics();
865
866 assert_eq!(agg_stats.total_queries, 10);
867 assert!(agg_stats.avg_execution_time_ms > 0.0);
868 assert!(agg_stats.avg_triples_per_query > 0.0);
869 }
870}