1use crate::query::algebra::{AlgebraTriplePattern, GraphPattern, TermPattern};
8use std::collections::HashMap;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::{Arc, RwLock};
11
12#[derive(Debug)]
14pub struct AdvancedStatisticsCollector {
15 subject_histogram: Arc<RwLock<CardinalityHistogram>>,
17 predicate_histogram: Arc<RwLock<CardinalityHistogram>>,
19 object_histogram: Arc<RwLock<CardinalityHistogram>>,
21 join_selectivity: Arc<RwLock<JoinSelectivityEstimator>>,
23 execution_history: Arc<RwLock<ExecutionHistory>>,
25 queries_analyzed: AtomicU64,
27}
28
29impl AdvancedStatisticsCollector {
30 pub fn new() -> Self {
32 Self {
33 subject_histogram: Arc::new(RwLock::new(CardinalityHistogram::new())),
34 predicate_histogram: Arc::new(RwLock::new(CardinalityHistogram::new())),
35 object_histogram: Arc::new(RwLock::new(CardinalityHistogram::new())),
36 join_selectivity: Arc::new(RwLock::new(JoinSelectivityEstimator::new())),
37 execution_history: Arc::new(RwLock::new(ExecutionHistory::new(1000))),
38 queries_analyzed: AtomicU64::new(0),
39 }
40 }
41
42 pub fn record_pattern_execution(
44 &self,
45 pattern: &AlgebraTriplePattern,
46 actual_cardinality: usize,
47 execution_time_ms: u64,
48 ) {
49 self.update_histograms(pattern, actual_cardinality);
51
52 let mut history = self
54 .execution_history
55 .write()
56 .expect("execution_history lock poisoned");
57 history.record(PatternExecution {
58 pattern: pattern.clone(),
59 cardinality: actual_cardinality,
60 execution_time_ms,
61 timestamp: std::time::SystemTime::now(),
62 });
63
64 self.queries_analyzed.fetch_add(1, Ordering::Relaxed);
65 }
66
67 fn update_histograms(&self, pattern: &AlgebraTriplePattern, cardinality: usize) {
69 if let TermPattern::NamedNode(node) = &pattern.subject {
71 let mut hist = self
72 .subject_histogram
73 .write()
74 .expect("subject_histogram lock poisoned");
75 hist.record(node.as_str(), cardinality);
76 }
77
78 if let TermPattern::NamedNode(node) = &pattern.predicate {
80 let mut hist = self
81 .predicate_histogram
82 .write()
83 .expect("predicate_histogram lock poisoned");
84 hist.record(node.as_str(), cardinality);
85 }
86
87 if let TermPattern::NamedNode(node) = &pattern.object {
89 let mut hist = self
90 .object_histogram
91 .write()
92 .expect("object_histogram lock poisoned");
93 hist.record(node.as_str(), cardinality);
94 }
95 }
96
97 pub fn estimate_cardinality(&self, pattern: &AlgebraTriplePattern) -> Option<usize> {
99 let subject_est = if let TermPattern::NamedNode(node) = &pattern.subject {
101 self.subject_histogram
102 .read()
103 .expect("subject_histogram lock poisoned")
104 .estimate(node.as_str())
105 } else {
106 None
107 };
108
109 let predicate_est = if let TermPattern::NamedNode(node) = &pattern.predicate {
110 self.predicate_histogram
111 .read()
112 .expect("predicate_histogram lock poisoned")
113 .estimate(node.as_str())
114 } else {
115 None
116 };
117
118 let object_est = if let TermPattern::NamedNode(node) = &pattern.object {
119 self.object_histogram
120 .read()
121 .expect("object_histogram lock poisoned")
122 .estimate(node.as_str())
123 } else {
124 None
125 };
126
127 [subject_est, predicate_est, object_est]
129 .iter()
130 .filter_map(|&x| x)
131 .min()
132 }
133
134 pub fn record_join_execution(
136 &self,
137 _left_pattern: &GraphPattern,
138 _right_pattern: &GraphPattern,
139 left_cardinality: usize,
140 right_cardinality: usize,
141 result_cardinality: usize,
142 ) {
143 let mut estimator = self
144 .join_selectivity
145 .write()
146 .expect("join_selectivity lock poisoned");
147 estimator.record_join(left_cardinality, right_cardinality, result_cardinality);
148 }
149
150 pub fn estimate_join_selectivity(&self, left_card: usize, right_card: usize) -> f64 {
152 self.join_selectivity
153 .read()
154 .expect("join_selectivity lock poisoned")
155 .estimate(left_card, right_card)
156 }
157
158 pub fn get_pattern_history(&self, pattern: &AlgebraTriplePattern) -> Vec<PatternExecution> {
160 self.execution_history
161 .read()
162 .expect("execution_history lock poisoned")
163 .get_similar_patterns(pattern)
164 }
165
166 pub fn get_statistics(&self) -> AdvancedStatistics {
168 AdvancedStatistics {
169 queries_analyzed: self.queries_analyzed.load(Ordering::Relaxed),
170 subject_histogram_size: self
171 .subject_histogram
172 .read()
173 .expect("subject_histogram lock poisoned")
174 .size(),
175 predicate_histogram_size: self
176 .predicate_histogram
177 .read()
178 .expect("predicate_histogram lock poisoned")
179 .size(),
180 object_histogram_size: self
181 .object_histogram
182 .read()
183 .expect("object_histogram lock poisoned")
184 .size(),
185 join_samples: self
186 .join_selectivity
187 .read()
188 .expect("join_selectivity lock poisoned")
189 .sample_count(),
190 history_size: self
191 .execution_history
192 .read()
193 .expect("execution_history lock poisoned")
194 .size(),
195 }
196 }
197
198 pub fn clear(&self) {
200 self.subject_histogram
201 .write()
202 .expect("subject_histogram lock poisoned")
203 .clear();
204 self.predicate_histogram
205 .write()
206 .expect("predicate_histogram lock poisoned")
207 .clear();
208 self.object_histogram
209 .write()
210 .expect("object_histogram lock poisoned")
211 .clear();
212 self.join_selectivity
213 .write()
214 .expect("join_selectivity lock poisoned")
215 .clear();
216 self.execution_history
217 .write()
218 .expect("execution_history lock poisoned")
219 .clear();
220 self.queries_analyzed.store(0, Ordering::Relaxed);
221 }
222}
223
224impl Default for AdvancedStatisticsCollector {
225 fn default() -> Self {
226 Self::new()
227 }
228}
229
230#[derive(Debug)]
232struct CardinalityHistogram {
233 data: HashMap<String, Vec<usize>>,
235 max_samples: usize,
237}
238
239impl CardinalityHistogram {
240 fn new() -> Self {
241 Self {
242 data: HashMap::new(),
243 max_samples: 100, }
245 }
246
247 fn record(&mut self, term: &str, cardinality: usize) {
248 let samples = self.data.entry(term.to_string()).or_default();
249 samples.push(cardinality);
250
251 if samples.len() > self.max_samples {
253 samples.remove(0);
254 }
255 }
256
257 fn estimate(&self, term: &str) -> Option<usize> {
258 self.data.get(term).and_then(|samples| {
259 if samples.is_empty() {
260 None
261 } else {
262 let mut sorted = samples.clone();
264 sorted.sort_unstable();
265 Some(sorted[sorted.len() / 2])
266 }
267 })
268 }
269
270 fn size(&self) -> usize {
271 self.data.len()
272 }
273
274 fn clear(&mut self) {
275 self.data.clear();
276 }
277}
278
279#[derive(Debug)]
281struct JoinSelectivityEstimator {
282 observations: Vec<JoinObservation>,
284 max_observations: usize,
286}
287
288#[derive(Debug, Clone)]
289#[allow(dead_code)]
290struct JoinObservation {
291 left_cardinality: usize,
292 right_cardinality: usize,
293 result_cardinality: usize,
294 selectivity: f64,
295}
296
297impl JoinSelectivityEstimator {
298 fn new() -> Self {
299 Self {
300 observations: Vec::new(),
301 max_observations: 1000,
302 }
303 }
304
305 fn record_join(&mut self, left_card: usize, right_card: usize, result_card: usize) {
306 let product = (left_card as f64) * (right_card as f64);
307 let selectivity = if product > 0.0 {
308 (result_card as f64) / product
309 } else {
310 0.0
311 };
312
313 self.observations.push(JoinObservation {
314 left_cardinality: left_card,
315 right_cardinality: right_card,
316 result_cardinality: result_card,
317 selectivity,
318 });
319
320 if self.observations.len() > self.max_observations {
322 self.observations.remove(0);
323 }
324 }
325
326 fn estimate(&self, left_card: usize, right_card: usize) -> f64 {
327 if self.observations.is_empty() {
328 return 0.1; }
330
331 let similar: Vec<f64> = self
333 .observations
334 .iter()
335 .filter(|obs| {
336 let left_ratio = (obs.left_cardinality as f64) / (left_card.max(1) as f64);
337 let right_ratio = (obs.right_cardinality as f64) / (right_card.max(1) as f64);
338 (0.5..=2.0).contains(&left_ratio) && (0.5..=2.0).contains(&right_ratio)
339 })
340 .map(|obs| obs.selectivity)
341 .collect();
342
343 if similar.is_empty() {
344 let avg: f64 = self.observations.iter().map(|o| o.selectivity).sum::<f64>()
346 / self.observations.len() as f64;
347 avg
348 } else {
349 let mut sorted = similar;
351 sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
352 sorted[sorted.len() / 2]
353 }
354 }
355
356 fn sample_count(&self) -> usize {
357 self.observations.len()
358 }
359
360 fn clear(&mut self) {
361 self.observations.clear();
362 }
363}
364
365#[derive(Debug)]
367struct ExecutionHistory {
368 executions: Vec<PatternExecution>,
370 max_size: usize,
372}
373
374#[derive(Debug, Clone)]
375pub struct PatternExecution {
376 pub pattern: AlgebraTriplePattern,
377 pub cardinality: usize,
378 pub execution_time_ms: u64,
379 pub timestamp: std::time::SystemTime,
380}
381
382impl ExecutionHistory {
383 fn new(max_size: usize) -> Self {
384 Self {
385 executions: Vec::new(),
386 max_size,
387 }
388 }
389
390 fn record(&mut self, execution: PatternExecution) {
391 self.executions.push(execution);
392
393 if self.executions.len() > self.max_size {
395 self.executions.remove(0);
396 }
397 }
398
399 fn get_similar_patterns(&self, pattern: &AlgebraTriplePattern) -> Vec<PatternExecution> {
400 self.executions
401 .iter()
402 .filter(|exec| Self::patterns_similar(&exec.pattern, pattern))
403 .cloned()
404 .collect()
405 }
406
407 fn patterns_similar(p1: &AlgebraTriplePattern, p2: &AlgebraTriplePattern) -> bool {
408 Self::term_pattern_type(&p1.subject) == Self::term_pattern_type(&p2.subject)
410 && Self::term_pattern_type(&p1.predicate) == Self::term_pattern_type(&p2.predicate)
411 && Self::term_pattern_type(&p1.object) == Self::term_pattern_type(&p2.object)
412 }
413
414 fn term_pattern_type(term: &TermPattern) -> &'static str {
415 match term {
416 TermPattern::Variable(_) => "var",
417 TermPattern::NamedNode(_) => "node",
418 TermPattern::BlankNode(_) => "blank",
419 TermPattern::Literal(_) => "literal",
420 TermPattern::QuotedTriple(_) => "quoted",
421 }
422 }
423
424 fn size(&self) -> usize {
425 self.executions.len()
426 }
427
428 fn clear(&mut self) {
429 self.executions.clear();
430 }
431}
432
433#[derive(Debug, Clone)]
435pub struct AdvancedStatistics {
436 pub queries_analyzed: u64,
437 pub subject_histogram_size: usize,
438 pub predicate_histogram_size: usize,
439 pub object_histogram_size: usize,
440 pub join_samples: usize,
441 pub history_size: usize,
442}
443
444#[cfg(test)]
445mod tests {
446 use super::*;
447 use crate::model::{NamedNode, Variable};
448
449 fn create_test_pattern() -> AlgebraTriplePattern {
450 AlgebraTriplePattern {
451 subject: TermPattern::Variable(Variable::new("s").expect("valid variable name")),
452 predicate: TermPattern::NamedNode(
453 NamedNode::new("http://xmlns.com/foaf/0.1/name").expect("valid IRI"),
454 ),
455 object: TermPattern::Variable(Variable::new("o").expect("valid variable name")),
456 }
457 }
458
459 #[test]
460 fn test_collector_creation() {
461 let collector = AdvancedStatisticsCollector::new();
462 let stats = collector.get_statistics();
463
464 assert_eq!(stats.queries_analyzed, 0);
465 assert_eq!(stats.history_size, 0);
466 }
467
468 #[test]
469 fn test_pattern_recording() {
470 let collector = AdvancedStatisticsCollector::new();
471 let pattern = create_test_pattern();
472
473 collector.record_pattern_execution(&pattern, 100, 50);
474
475 let stats = collector.get_statistics();
476 assert_eq!(stats.queries_analyzed, 1);
477 assert_eq!(stats.history_size, 1);
478 }
479
480 #[test]
481 fn test_histogram_estimation() {
482 let collector = AdvancedStatisticsCollector::new();
483 let foaf_name = NamedNode::new("http://xmlns.com/foaf/0.1/name").expect("valid IRI");
484
485 let pattern = AlgebraTriplePattern {
486 subject: TermPattern::Variable(Variable::new("s").expect("valid variable name")),
487 predicate: TermPattern::NamedNode(foaf_name.clone()),
488 object: TermPattern::Variable(Variable::new("o").expect("valid variable name")),
489 };
490
491 for i in 1..=10 {
493 collector.record_pattern_execution(&pattern, 100 * i, 10);
494 }
495
496 let estimate = collector.estimate_cardinality(&pattern);
498 assert!(estimate.is_some());
499 let est = estimate.expect("estimate should be available");
500 assert!((400..=700).contains(&est));
502 }
503
504 #[test]
505 fn test_join_selectivity() {
506 let collector = AdvancedStatisticsCollector::new();
507
508 collector.record_join_execution(
510 &GraphPattern::Bgp(vec![]),
511 &GraphPattern::Bgp(vec![]),
512 1000,
513 1000,
514 100,
515 );
516 collector.record_join_execution(
517 &GraphPattern::Bgp(vec![]),
518 &GraphPattern::Bgp(vec![]),
519 2000,
520 2000,
521 400,
522 );
523
524 let selectivity = collector.estimate_join_selectivity(1500, 1500);
526 assert!(selectivity > 0.00005 && selectivity < 0.002);
527 }
528
529 #[test]
530 fn test_history_limit() {
531 let collector = AdvancedStatisticsCollector::new();
532 let pattern = create_test_pattern();
533
534 for _ in 0..1500 {
536 collector.record_pattern_execution(&pattern, 100, 10);
537 }
538
539 let stats = collector.get_statistics();
540 assert!(stats.history_size <= 1000);
541 }
542
543 #[test]
544 fn test_clear_statistics() {
545 let collector = AdvancedStatisticsCollector::new();
546 let pattern = create_test_pattern();
547
548 collector.record_pattern_execution(&pattern, 100, 10);
549 collector.clear();
550
551 let stats = collector.get_statistics();
552 assert_eq!(stats.queries_analyzed, 0);
553 assert_eq!(stats.history_size, 0);
554 }
555}