1use crate::model::pattern::TriplePattern;
7use crate::model::Triple;
8use crate::query::algebra::{AlgebraTriplePattern, TermPattern};
9use crate::OxirsError;
10use scirs2_core::metrics::{Counter, Histogram, MetricsRegistry};
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::sync::{Arc, RwLock};
15use std::time::{Duration, Instant};
16
17#[derive(Clone)]
22pub struct GraphStatistics {
23 total_triples: Arc<AtomicU64>,
25 distinct_subjects: Arc<AtomicU64>,
27 distinct_predicates: Arc<AtomicU64>,
29 distinct_objects: Arc<AtomicU64>,
31 predicate_stats: Arc<RwLock<HashMap<String, PredicateStatistics>>>,
33 pattern_selectivity: Arc<RwLock<HashMap<String, SelectivityInfo>>>,
35 #[allow(dead_code)]
37 metrics: Arc<MetricsRegistry>,
38 last_updated: Arc<RwLock<Instant>>,
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct PredicateStatistics {
45 pub count: u64,
47 pub distinct_subjects: u64,
49 pub distinct_objects: u64,
51 pub avg_objects_per_subject: f64,
53 pub avg_subjects_per_object: f64,
55 pub min_cardinality: u64,
57 pub max_cardinality: u64,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct SelectivityInfo {
64 pub pattern_signature: String,
66 pub observed_selectivity: f64,
68 pub observation_count: u64,
70 pub last_observed_ms: u128,
72 pub estimated_result_size: u64,
74}
75
76#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct QueryExecutionStats {
79 pub query_signature: String,
81 pub execution_time: Duration,
83 pub estimated_time: Duration,
85 pub actual_results: u64,
87 pub estimated_results: u64,
89 pub memory_bytes: u64,
91 pub cpu_time: Duration,
93}
94
95impl GraphStatistics {
96 pub fn new() -> Self {
98 let metrics = MetricsRegistry::new();
99
100 Self {
101 total_triples: Arc::new(AtomicU64::new(0)),
102 distinct_subjects: Arc::new(AtomicU64::new(0)),
103 distinct_predicates: Arc::new(AtomicU64::new(0)),
104 distinct_objects: Arc::new(AtomicU64::new(0)),
105 predicate_stats: Arc::new(RwLock::new(HashMap::new())),
106 pattern_selectivity: Arc::new(RwLock::new(HashMap::new())),
107 metrics: Arc::new(metrics),
108 last_updated: Arc::new(RwLock::new(Instant::now())),
109 }
110 }
111
112 pub fn record_insert(&self, triple: &Triple) -> Result<(), OxirsError> {
114 self.total_triples.fetch_add(1, Ordering::Relaxed);
115
116 if let crate::model::Predicate::NamedNode(predicate) = triple.predicate() {
118 let pred_str = predicate.as_str().to_string();
119
120 let mut stats = self.predicate_stats.write().map_err(|e| {
121 OxirsError::Store(format!("Failed to write predicate stats: {}", e))
122 })?;
123
124 let pred_stat = stats
125 .entry(pred_str.clone())
126 .or_insert_with(|| PredicateStatistics {
127 count: 0,
128 distinct_subjects: 0,
129 distinct_objects: 0,
130 avg_objects_per_subject: 0.0,
131 avg_subjects_per_object: 0.0,
132 min_cardinality: u64::MAX,
133 max_cardinality: 0,
134 });
135
136 pred_stat.count += 1;
137
138 let counter = Counter::new("graph.triples.total".to_string());
140 counter.add(1);
141
142 let pred_counter = Counter::new(format!("graph.predicate.{}.count", pred_str));
143 pred_counter.add(1);
144 }
145
146 if let Ok(mut last) = self.last_updated.write() {
148 *last = Instant::now();
149 }
150
151 Ok(())
152 }
153
154 pub fn record_remove(&self, triple: &Triple) -> Result<(), OxirsError> {
156 let current = self.total_triples.load(Ordering::Relaxed);
157 if current > 0 {
158 self.total_triples.fetch_sub(1, Ordering::Relaxed);
159 }
160
161 if let crate::model::Predicate::NamedNode(predicate) = triple.predicate() {
163 let pred_str = predicate.as_str().to_string();
164
165 let mut stats = self.predicate_stats.write().map_err(|e| {
166 OxirsError::Store(format!("Failed to write predicate stats: {}", e))
167 })?;
168
169 if let Some(pred_stat) = stats.get_mut(&pred_str) {
170 if pred_stat.count > 0 {
171 pred_stat.count -= 1;
172 }
173 }
174
175 let counter = Counter::new("graph.triples.removed".to_string());
177 counter.add(1);
178 }
179
180 if let Ok(mut last) = self.last_updated.write() {
182 *last = Instant::now();
183 }
184
185 Ok(())
186 }
187
188 pub fn total_triples(&self) -> u64 {
190 self.total_triples.load(Ordering::Relaxed)
191 }
192
193 pub fn get_predicate_stats(&self, predicate: &str) -> Option<PredicateStatistics> {
195 self.predicate_stats.read().ok()?.get(predicate).cloned()
196 }
197
198 pub fn estimate_pattern_cardinality(&self, pattern: &TriplePattern) -> u64 {
200 let total = self.total_triples() as f64;
201 if total == 0.0 {
202 return 0;
203 }
204
205 let mut selectivity = 1.0;
206
207 if pattern.subject().is_some() {
209 selectivity *= 0.001; } else {
211 selectivity *= 0.5;
212 }
213
214 if let Some(crate::model::pattern::PredicatePattern::NamedNode(pred)) = pattern.predicate()
215 {
216 if let Some(stats) = self.get_predicate_stats(pred.as_str()) {
218 let pred_selectivity = stats.count as f64 / total;
219 selectivity *= pred_selectivity;
220 } else {
221 selectivity *= 0.1; }
223 } else {
224 selectivity *= 0.5;
225 }
226
227 if pattern.object().is_some() {
228 selectivity *= 0.001; } else {
230 selectivity *= 0.5;
231 }
232
233 (total * selectivity).max(1.0) as u64
234 }
235
236 pub fn estimate_algebra_pattern_cardinality(&self, pattern: &AlgebraTriplePattern) -> u64 {
238 let total = self.total_triples() as f64;
239 if total == 0.0 {
240 return 0;
241 }
242
243 let mut selectivity = 1.0;
244
245 match &pattern.subject {
247 TermPattern::Variable(_) => selectivity *= 0.5,
248 _ => selectivity *= 0.001,
249 }
250
251 match &pattern.predicate {
252 TermPattern::Variable(_) => selectivity *= 0.5,
253 TermPattern::NamedNode(pred) => {
254 if let Some(stats) = self.get_predicate_stats(pred.as_str()) {
255 selectivity *= stats.count as f64 / total;
256 } else {
257 selectivity *= 0.1;
258 }
259 }
260 _ => selectivity *= 0.1,
261 }
262
263 match &pattern.object {
264 TermPattern::Variable(_) => selectivity *= 0.5,
265 _ => selectivity *= 0.001,
266 }
267
268 (total * selectivity).max(1.0) as u64
269 }
270
271 pub fn record_query_execution(&self, stats: QueryExecutionStats) -> Result<(), OxirsError> {
273 let exec_counter = Counter::new("query.execution.total".to_string());
275 exec_counter.add(1);
276
277 let time_counter = Counter::new("query.execution.time_ms".to_string());
278 time_counter.add(stats.execution_time.as_millis() as u64);
279
280 let accuracy_ratio = if stats.estimated_results > 0 {
281 stats.actual_results as f64 / stats.estimated_results as f64
282 } else {
283 1.0
284 };
285
286 let histogram = Histogram::new("query.estimation.accuracy".to_string());
287 histogram.observe(accuracy_ratio);
288
289 let observed_selectivity = if self.total_triples() > 0 {
291 stats.actual_results as f64 / self.total_triples() as f64
292 } else {
293 0.0
294 };
295
296 let mut pattern_sel = self
297 .pattern_selectivity
298 .write()
299 .map_err(|e| OxirsError::Query(format!("Failed to write selectivity: {}", e)))?;
300
301 let selectivity_info = pattern_sel
302 .entry(stats.query_signature.clone())
303 .or_insert_with(|| SelectivityInfo {
304 pattern_signature: stats.query_signature.clone(),
305 observed_selectivity: 0.0,
306 observation_count: 0,
307 last_observed_ms: 0,
308 estimated_result_size: 0,
309 });
310
311 let alpha = 0.3; selectivity_info.observed_selectivity =
314 alpha * observed_selectivity + (1.0 - alpha) * selectivity_info.observed_selectivity;
315 selectivity_info.observation_count += 1;
316 selectivity_info.last_observed_ms = Instant::now().elapsed().as_millis();
317 selectivity_info.estimated_result_size = stats.actual_results;
318
319 Ok(())
320 }
321
322 pub fn get_learned_selectivity(&self, pattern_signature: &str) -> Option<f64> {
324 self.pattern_selectivity
325 .read()
326 .ok()?
327 .get(pattern_signature)
328 .map(|info| info.observed_selectivity)
329 }
330
331 pub fn export_to_json(&self) -> Result<String, OxirsError> {
333 let stats = self
334 .predicate_stats
335 .read()
336 .map_err(|e| OxirsError::Serialize(format!("Failed to read stats: {}", e)))?;
337
338 serde_json::to_string_pretty(&*stats).map_err(|e| OxirsError::Serialize(e.to_string()))
339 }
340
341 pub fn import_from_json(&self, json: &str) -> Result<(), OxirsError> {
343 let stats: HashMap<String, PredicateStatistics> =
344 serde_json::from_str(json).map_err(|e| OxirsError::Parse(e.to_string()))?;
345
346 let mut current_stats = self
347 .predicate_stats
348 .write()
349 .map_err(|e| OxirsError::Store(format!("Failed to write stats: {}", e)))?;
350
351 *current_stats = stats;
352
353 let total: u64 = current_stats.values().map(|s| s.count).sum();
355 self.total_triples.store(total, Ordering::Relaxed);
356
357 Ok(())
358 }
359
360 pub fn recompute_from_triples(&self, triples: &[Triple]) -> Result<(), OxirsError> {
362 tracing::info!("Recomputing statistics from {} triples", triples.len());
363
364 let start = Instant::now();
365
366 self.total_triples
368 .store(triples.len() as u64, Ordering::Relaxed);
369
370 let mut predicate_counts: HashMap<String, PredicateStatistics> = HashMap::new();
371 let mut subject_counts: HashMap<String, u64> = HashMap::new();
372 let mut object_counts: HashMap<String, u64> = HashMap::new();
373
374 for triple in triples {
376 if let crate::model::Predicate::NamedNode(pred) = triple.predicate() {
377 let pred_str = pred.as_str().to_string();
378
379 let stat = predicate_counts.entry(pred_str.clone()).or_insert_with(|| {
380 PredicateStatistics {
381 count: 0,
382 distinct_subjects: 0,
383 distinct_objects: 0,
384 avg_objects_per_subject: 0.0,
385 avg_subjects_per_object: 0.0,
386 min_cardinality: u64::MAX,
387 max_cardinality: 0,
388 }
389 });
390
391 stat.count += 1;
392
393 if let crate::model::Subject::NamedNode(subj) = triple.subject() {
395 *subject_counts
396 .entry(format!("{}:{}", pred_str, subj.as_str()))
397 .or_insert(0) += 1;
398 }
399
400 if let crate::model::Object::NamedNode(obj) = triple.object() {
401 *object_counts
402 .entry(format!("{}:{}", pred_str, obj.as_str()))
403 .or_insert(0) += 1;
404 }
405 }
406 }
407
408 for (pred_str, stat) in predicate_counts.iter_mut() {
410 let prefix = format!("{}:", pred_str);
411
412 stat.distinct_subjects = subject_counts
413 .keys()
414 .filter(|k| k.starts_with(&prefix))
415 .count() as u64;
416
417 stat.distinct_objects = object_counts
418 .keys()
419 .filter(|k| k.starts_with(&prefix))
420 .count() as u64;
421
422 if stat.distinct_subjects > 0 {
423 stat.avg_objects_per_subject = stat.count as f64 / stat.distinct_subjects as f64;
424 }
425
426 if stat.distinct_objects > 0 {
427 stat.avg_subjects_per_object = stat.count as f64 / stat.distinct_objects as f64;
428 }
429 }
430
431 let mut stats = self
433 .predicate_stats
434 .write()
435 .map_err(|e| OxirsError::Store(format!("Failed to write stats: {}", e)))?;
436 *stats = predicate_counts;
437
438 self.distinct_predicates
440 .store(stats.len() as u64, Ordering::Relaxed);
441
442 let elapsed = start.elapsed();
443 tracing::info!("Statistics recomputation completed in {:?}", elapsed);
444
445 Ok(())
446 }
447
448 pub fn summary(&self) -> StatisticsSummary {
450 StatisticsSummary {
451 total_triples: self.total_triples(),
452 distinct_subjects: self.distinct_subjects.load(Ordering::Relaxed),
453 distinct_predicates: self.distinct_predicates.load(Ordering::Relaxed),
454 distinct_objects: self.distinct_objects.load(Ordering::Relaxed),
455 predicate_count: self
456 .predicate_stats
457 .read()
458 .ok()
459 .map(|s| s.len())
460 .unwrap_or(0),
461 last_updated: self.last_updated.read().ok().map(|t| *t),
462 }
463 }
464}
465
466impl Default for GraphStatistics {
467 fn default() -> Self {
468 Self::new()
469 }
470}
471
472#[derive(Debug, Clone, Serialize, Deserialize)]
474pub struct StatisticsSummary {
475 pub total_triples: u64,
476 pub distinct_subjects: u64,
477 pub distinct_predicates: u64,
478 pub distinct_objects: u64,
479 pub predicate_count: usize,
480 #[serde(skip)]
481 pub last_updated: Option<Instant>,
482}
483
484#[cfg(test)]
485mod tests {
486 use super::*;
487 use crate::model::{Literal, NamedNode};
488
489 #[test]
490 fn test_statistics_creation() {
491 let stats = GraphStatistics::new();
492 assert_eq!(stats.total_triples(), 0);
493 }
494
495 #[test]
496 fn test_record_insert() {
497 let stats = GraphStatistics::new();
498
499 let subject = NamedNode::new("http://example.org/s").expect("valid IRI");
500 let predicate = NamedNode::new("http://example.org/p").expect("valid IRI");
501 let object = Literal::new("value");
502
503 let triple = Triple::new(subject, predicate, object);
504
505 stats
506 .record_insert(&triple)
507 .expect("operation should succeed");
508 assert_eq!(stats.total_triples(), 1);
509 }
510
511 #[test]
512 fn test_record_remove() {
513 let stats = GraphStatistics::new();
514
515 let subject = NamedNode::new("http://example.org/s").expect("valid IRI");
516 let predicate = NamedNode::new("http://example.org/p").expect("valid IRI");
517 let object = Literal::new("value");
518
519 let triple = Triple::new(subject, predicate, object);
520
521 stats
522 .record_insert(&triple)
523 .expect("operation should succeed");
524 assert_eq!(stats.total_triples(), 1);
525
526 stats
527 .record_remove(&triple)
528 .expect("operation should succeed");
529 assert_eq!(stats.total_triples(), 0);
530 }
531
532 #[test]
533 fn test_predicate_statistics() {
534 let stats = GraphStatistics::new();
535
536 let subject = NamedNode::new("http://example.org/s").expect("valid IRI");
537 let predicate = NamedNode::new("http://example.org/p").expect("valid IRI");
538 let object = Literal::new("value");
539
540 let triple = Triple::new(subject, predicate.clone(), object);
541
542 stats
543 .record_insert(&triple)
544 .expect("operation should succeed");
545
546 let pred_stats = stats.get_predicate_stats(predicate.as_str());
547 assert!(pred_stats.is_some());
548 assert_eq!(pred_stats.expect("predicate stats should exist").count, 1);
549 }
550
551 #[test]
552 fn test_pattern_cardinality_estimation() {
553 let stats = GraphStatistics::new();
554
555 for i in 0..100 {
557 let subject = NamedNode::new(format!("http://example.org/s{}", i))
558 .expect("valid IRI from format");
559 let predicate = NamedNode::new("http://example.org/p").expect("valid IRI");
560 let object = Literal::new(format!("value{}", i));
561
562 let triple = Triple::new(subject, predicate, object);
563 stats
564 .record_insert(&triple)
565 .expect("operation should succeed");
566 }
567
568 let pattern = TriplePattern::new(
570 None,
571 Some(crate::model::pattern::PredicatePattern::NamedNode(
572 NamedNode::new("http://example.org/p").expect("valid IRI"),
573 )),
574 None,
575 );
576
577 let estimated = stats.estimate_pattern_cardinality(&pattern);
578 assert!(estimated > 0);
579 assert!(estimated <= 100);
580 }
581
582 #[test]
583 fn test_query_execution_recording() {
584 let stats = GraphStatistics::new();
585
586 let exec_stats = QueryExecutionStats {
587 query_signature: "SELECT ?s WHERE { ?s ?p ?o }".to_string(),
588 execution_time: Duration::from_millis(50),
589 estimated_time: Duration::from_millis(100),
590 actual_results: 42,
591 estimated_results: 50,
592 memory_bytes: 1024 * 1024,
593 cpu_time: Duration::from_millis(30),
594 };
595
596 stats
597 .record_query_execution(exec_stats)
598 .expect("operation should succeed");
599
600 let learned = stats.get_learned_selectivity("SELECT ?s WHERE { ?s ?p ?o }");
601 assert!(learned.is_some());
602 }
603
604 #[test]
605 fn test_statistics_export_import() {
606 let stats = GraphStatistics::new();
607
608 let subject = NamedNode::new("http://example.org/s").expect("valid IRI");
610 let predicate = NamedNode::new("http://example.org/p").expect("valid IRI");
611 let object = Literal::new("value");
612
613 let triple = Triple::new(subject, predicate, object);
614 stats
615 .record_insert(&triple)
616 .expect("operation should succeed");
617
618 let json = stats.export_to_json().expect("operation should succeed");
620 assert!(!json.is_empty());
621
622 let stats2 = GraphStatistics::new();
624 stats2
625 .import_from_json(&json)
626 .expect("operation should succeed");
627
628 assert_eq!(stats2.total_triples(), 1);
629 }
630
631 #[test]
632 fn test_statistics_summary() {
633 let stats = GraphStatistics::new();
634
635 let summary = stats.summary();
636 assert_eq!(summary.total_triples, 0);
637 assert_eq!(summary.predicate_count, 0);
638 }
639}