1use crate::model::{Object, Predicate, Subject, Triple};
7use crate::store::{IndexType, IndexedGraph};
8use crate::OxirsError;
9use dashmap::DashMap;
10use parking_lot::{Mutex, RwLock};
11use std::collections::{HashMap, VecDeque};
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
17pub enum QueryPattern {
18 SubjectQuery,
20 PredicateQuery,
22 ObjectQuery,
24 SubjectPredicate,
26 SubjectObject,
28 PredicateObject,
30 SpecificTriple,
32 FullScan,
34}
35
36impl QueryPattern {
37 pub fn from_components(
39 subject: Option<&Subject>,
40 predicate: Option<&Predicate>,
41 object: Option<&Object>,
42 ) -> Self {
43 match (subject.is_some(), predicate.is_some(), object.is_some()) {
44 (true, true, true) => QueryPattern::SpecificTriple,
45 (true, true, false) => QueryPattern::SubjectPredicate,
46 (true, false, true) => QueryPattern::SubjectObject,
47 (false, true, true) => QueryPattern::PredicateObject,
48 (true, false, false) => QueryPattern::SubjectQuery,
49 (false, true, false) => QueryPattern::PredicateQuery,
50 (false, false, true) => QueryPattern::ObjectQuery,
51 (false, false, false) => QueryPattern::FullScan,
52 }
53 }
54
55 pub fn recommended_index(&self) -> Option<IndexType> {
57 match self {
58 QueryPattern::SubjectQuery | QueryPattern::SubjectPredicate => Some(IndexType::SPO),
59 QueryPattern::PredicateQuery | QueryPattern::PredicateObject => Some(IndexType::POS),
60 QueryPattern::ObjectQuery | QueryPattern::SubjectObject => Some(IndexType::OSP),
61 QueryPattern::SpecificTriple => Some(IndexType::SPO), QueryPattern::FullScan => None, }
64 }
65}
66
67#[derive(Debug, Clone)]
69pub struct PatternStats {
70 pub query_count: u64,
72 pub total_time: Duration,
74 pub avg_result_size: f64,
76 pub last_queried: Instant,
78 pub query_frequency: f64,
80}
81
82impl Default for PatternStats {
83 fn default() -> Self {
84 Self {
85 query_count: 0,
86 total_time: Duration::ZERO,
87 avg_result_size: 0.0,
88 last_queried: Instant::now(),
89 query_frequency: 0.0,
90 }
91 }
92}
93
94#[derive(Debug, Clone)]
96pub struct AdaptiveConfig {
97 pub min_queries_for_index: u64,
99 pub min_frequency_for_index: f64,
101 pub max_adaptive_indexes: usize,
103 pub analysis_window: Duration,
105 pub maintenance_interval: Duration,
107 pub index_cost_threshold: f64,
109}
110
111impl Default for AdaptiveConfig {
112 fn default() -> Self {
113 Self {
114 min_queries_for_index: 100,
115 min_frequency_for_index: 0.1,
116 max_adaptive_indexes: 5,
117 analysis_window: Duration::from_secs(300), maintenance_interval: Duration::from_secs(60), index_cost_threshold: 0.5, }
121 }
122}
123
124pub struct AdaptiveIndexManager {
126 base_graph: Arc<RwLock<IndexedGraph>>,
128 pattern_stats: Arc<DashMap<QueryPattern, PatternStats>>,
130 adaptive_indexes: Arc<RwLock<HashMap<QueryPattern, Box<dyn AdaptiveIndex>>>>,
132 config: AdaptiveConfig,
134 last_maintenance: Arc<Mutex<Instant>>,
136 query_history: Arc<Mutex<VecDeque<(QueryPattern, Instant, Duration)>>>,
138}
139
140impl AdaptiveIndexManager {
141 pub fn new(base_graph: IndexedGraph, config: AdaptiveConfig) -> Self {
143 Self {
144 base_graph: Arc::new(RwLock::new(base_graph)),
145 pattern_stats: Arc::new(DashMap::new()),
146 adaptive_indexes: Arc::new(RwLock::new(HashMap::new())),
147 config,
148 last_maintenance: Arc::new(Mutex::new(Instant::now())),
149 query_history: Arc::new(Mutex::new(VecDeque::new())),
150 }
151 }
152
153 pub fn query(
155 &self,
156 subject: Option<&Subject>,
157 predicate: Option<&Predicate>,
158 object: Option<&Object>,
159 ) -> Result<Vec<Triple>, OxirsError> {
160 let start = Instant::now();
161 let pattern = QueryPattern::from_components(subject, predicate, object);
162
163 let result = {
165 let indexes = self.adaptive_indexes.read();
166 if let Some(index) = indexes.get(&pattern) {
167 index.query(subject, predicate, object)
169 } else {
170 let graph = self.base_graph.read();
172 Ok(graph.match_pattern(subject, predicate, object))
173 }
174 }?;
175
176 let duration = start.elapsed();
177
178 self.update_pattern_stats(pattern, duration, result.len());
180
181 {
183 let mut history = self.query_history.lock();
184 history.push_back((pattern, Instant::now(), duration));
185
186 let cutoff = Instant::now() - self.config.analysis_window;
188 while let Some((_, timestamp, _)) = history.front() {
189 if *timestamp < cutoff {
190 history.pop_front();
191 } else {
192 break;
193 }
194 }
195 }
196
197 self.maybe_run_maintenance();
199
200 Ok(result)
201 }
202
203 fn update_pattern_stats(&self, pattern: QueryPattern, duration: Duration, result_size: usize) {
205 let mut stats = self.pattern_stats.entry(pattern).or_default();
206
207 let now = Instant::now();
208 let time_since_last = now.duration_since(stats.last_queried).as_secs_f64();
209
210 stats.query_count += 1;
212 stats.total_time += duration;
213 stats.avg_result_size = (stats.avg_result_size * (stats.query_count - 1) as f64
214 + result_size as f64)
215 / stats.query_count as f64;
216
217 if time_since_last > 0.0 {
219 let instant_frequency = 1.0 / time_since_last;
220 stats.query_frequency = stats.query_frequency * 0.9 + instant_frequency * 0.1;
221 }
222
223 stats.last_queried = now;
224 }
225
226 fn maybe_run_maintenance(&self) {
228 let mut last_maintenance = self.last_maintenance.lock();
229 if last_maintenance.elapsed() >= self.config.maintenance_interval {
230 *last_maintenance = Instant::now();
231 drop(last_maintenance);
232
233 let self_clone = self.clone();
235 std::thread::spawn(move || {
236 self_clone.run_maintenance_internal();
237 });
238 }
239 }
240
241 pub fn run_maintenance(&self) {
248 self.run_maintenance_internal();
249 }
250
251 fn run_maintenance_internal(&self) {
253 let patterns_to_index = self.analyze_patterns();
255
256 for pattern in patterns_to_index {
258 self.create_adaptive_index(pattern);
259 }
260
261 self.cleanup_indexes();
263 }
264
265 fn analyze_patterns(&self) -> Vec<QueryPattern> {
267 let mut candidates = Vec::new();
268
269 for entry in self.pattern_stats.iter() {
270 let (pattern, stats) = entry.pair();
271
272 if self.adaptive_indexes.read().contains_key(pattern) {
274 continue;
275 }
276
277 if stats.query_count >= self.config.min_queries_for_index
279 && stats.query_frequency >= self.config.min_frequency_for_index
280 {
281 if let Some(benefit) = self.estimate_index_benefit(*pattern, stats) {
283 if benefit >= self.config.index_cost_threshold {
284 candidates.push((*pattern, benefit));
285 }
286 }
287 }
288 }
289
290 candidates.sort_by(|a, b| {
292 b.1.partial_cmp(&a.1)
293 .expect("benefit scores should be finite")
294 });
295 candidates.truncate(self.config.max_adaptive_indexes);
296
297 candidates.into_iter().map(|(pattern, _)| pattern).collect()
298 }
299
300 fn estimate_index_benefit(&self, _pattern: QueryPattern, stats: &PatternStats) -> Option<f64> {
302 let graph = self.base_graph.read();
304 let total_triples = graph.len() as f64;
305
306 let scan_cost = total_triples;
308
309 let index_cost = stats.avg_result_size;
311
312 if scan_cost > 0.0 {
313 Some((scan_cost - index_cost) / scan_cost)
314 } else {
315 None
316 }
317 }
318
319 fn create_adaptive_index(&self, pattern: QueryPattern) {
321 let mut indexes = self.adaptive_indexes.write();
322
323 if indexes.len() >= self.config.max_adaptive_indexes {
325 return;
326 }
327
328 let index: Box<dyn AdaptiveIndex> = match pattern {
330 QueryPattern::PredicateQuery => Box::new(PredicateIndex::new(self.base_graph.clone())),
331 QueryPattern::SubjectPredicate => {
332 Box::new(SubjectPredicateIndex::new(self.base_graph.clone()))
333 }
334 _ => return, };
336
337 indexes.insert(pattern, index);
338 }
339
340 fn cleanup_indexes(&self) {
342 let mut indexes = self.adaptive_indexes.write();
343 let stats = self.pattern_stats.clone();
344
345 indexes.retain(|pattern, _| {
346 match stats.get(pattern) {
347 Some(pattern_stats) => {
348 pattern_stats.query_frequency >= self.config.min_frequency_for_index * 0.5
350 }
351 _ => false,
352 }
353 });
354 }
355
356 pub fn get_stats(&self) -> AdaptiveIndexStats {
358 let pattern_stats: HashMap<QueryPattern, PatternStats> = self
359 .pattern_stats
360 .iter()
361 .map(|entry| (*entry.key(), entry.value().clone()))
362 .collect();
363
364 let active_indexes: Vec<QueryPattern> =
365 self.adaptive_indexes.read().keys().copied().collect();
366
367 let total_queries = pattern_stats.values().map(|s| s.query_count).sum();
368
369 AdaptiveIndexStats {
370 pattern_stats,
371 active_indexes,
372 total_queries,
373 }
374 }
375
376 pub fn insert(&self, triple: Triple) -> Result<bool, OxirsError> {
378 let inserted = self.base_graph.write().insert(&triple);
380
381 if inserted {
382 let indexes = self.adaptive_indexes.read();
384 for index in indexes.values() {
385 index.insert(&triple)?;
386 }
387 }
388
389 Ok(inserted)
390 }
391
392 pub fn remove(&self, triple: &Triple) -> Result<bool, OxirsError> {
394 let removed = self.base_graph.write().remove(triple);
396
397 if removed {
398 let indexes = self.adaptive_indexes.read();
400 for index in indexes.values() {
401 index.remove(triple)?;
402 }
403 }
404
405 Ok(removed)
406 }
407}
408
409impl Clone for AdaptiveIndexManager {
411 fn clone(&self) -> Self {
412 Self {
413 base_graph: self.base_graph.clone(),
414 pattern_stats: self.pattern_stats.clone(),
415 adaptive_indexes: self.adaptive_indexes.clone(),
416 config: self.config.clone(),
417 last_maintenance: Arc::new(Mutex::new(*self.last_maintenance.lock())),
418 query_history: self.query_history.clone(),
419 }
420 }
421}
422
423trait AdaptiveIndex: Send + Sync {
425 fn query(
427 &self,
428 subject: Option<&Subject>,
429 predicate: Option<&Predicate>,
430 object: Option<&Object>,
431 ) -> Result<Vec<Triple>, OxirsError>;
432
433 fn insert(&self, triple: &Triple) -> Result<(), OxirsError>;
435
436 fn remove(&self, triple: &Triple) -> Result<(), OxirsError>;
438}
439
440struct PredicateIndex {
442 base_graph: Arc<RwLock<IndexedGraph>>,
443 predicate_map: Arc<DashMap<Predicate, Vec<Triple>>>,
444}
445
446impl PredicateIndex {
447 fn new(base_graph: Arc<RwLock<IndexedGraph>>) -> Self {
448 let index = Self {
449 base_graph: base_graph.clone(),
450 predicate_map: Arc::new(DashMap::new()),
451 };
452
453 let graph = base_graph.read();
455 for triple in graph.iter() {
456 index
457 .predicate_map
458 .entry(triple.predicate().clone())
459 .or_default()
460 .push(triple);
461 }
462
463 index
464 }
465}
466
467impl AdaptiveIndex for PredicateIndex {
468 fn query(
469 &self,
470 subject: Option<&Subject>,
471 predicate: Option<&Predicate>,
472 object: Option<&Object>,
473 ) -> Result<Vec<Triple>, OxirsError> {
474 if let Some(pred) = predicate {
475 if let Some(triples) = self.predicate_map.get(pred) {
476 let results: Vec<Triple> = triples
477 .iter()
478 .filter(|t| {
479 subject.map_or(true, |s| t.subject() == s)
480 && object.map_or(true, |o| t.object() == o)
481 })
482 .cloned()
483 .collect();
484 return Ok(results);
485 }
486 }
487
488 let graph = self.base_graph.read();
490 Ok(graph.match_pattern(subject, predicate, object))
491 }
492
493 fn insert(&self, triple: &Triple) -> Result<(), OxirsError> {
494 self.predicate_map
495 .entry(triple.predicate().clone())
496 .or_default()
497 .push(triple.clone());
498 Ok(())
499 }
500
501 fn remove(&self, triple: &Triple) -> Result<(), OxirsError> {
502 if let Some(mut triples) = self.predicate_map.get_mut(triple.predicate()) {
503 triples.retain(|t| t != triple);
504 }
505 Ok(())
506 }
507}
508
509struct SubjectPredicateIndex {
511 base_graph: Arc<RwLock<IndexedGraph>>,
512 sp_map: Arc<DashMap<(Subject, Predicate), Vec<Object>>>,
513}
514
515impl SubjectPredicateIndex {
516 fn new(base_graph: Arc<RwLock<IndexedGraph>>) -> Self {
517 let index = Self {
518 base_graph: base_graph.clone(),
519 sp_map: Arc::new(DashMap::new()),
520 };
521
522 let graph = base_graph.read();
524 for triple in graph.iter() {
525 index
526 .sp_map
527 .entry((triple.subject().clone(), triple.predicate().clone()))
528 .or_default()
529 .push(triple.object().clone());
530 }
531
532 index
533 }
534}
535
536impl AdaptiveIndex for SubjectPredicateIndex {
537 fn query(
538 &self,
539 subject: Option<&Subject>,
540 predicate: Option<&Predicate>,
541 object: Option<&Object>,
542 ) -> Result<Vec<Triple>, OxirsError> {
543 if let (Some(subj), Some(pred)) = (subject, predicate) {
544 if let Some(objects) = self.sp_map.get(&(subj.clone(), pred.clone())) {
545 let results: Vec<Triple> = objects
546 .iter()
547 .filter(|o| object.map_or(true, |obj| *o == obj))
548 .map(|o| Triple::new(subj.clone(), pred.clone(), o.clone()))
549 .collect();
550 return Ok(results);
551 }
552 }
553
554 let graph = self.base_graph.read();
556 Ok(graph.match_pattern(subject, predicate, object))
557 }
558
559 fn insert(&self, triple: &Triple) -> Result<(), OxirsError> {
560 self.sp_map
561 .entry((triple.subject().clone(), triple.predicate().clone()))
562 .or_default()
563 .push(triple.object().clone());
564 Ok(())
565 }
566
567 fn remove(&self, triple: &Triple) -> Result<(), OxirsError> {
568 if let Some(mut objects) = self
569 .sp_map
570 .get_mut(&(triple.subject().clone(), triple.predicate().clone()))
571 {
572 objects.retain(|o| o != triple.object());
573 }
574 Ok(())
575 }
576}
577
578#[derive(Debug, Clone)]
580pub struct AdaptiveIndexStats {
581 pub pattern_stats: HashMap<QueryPattern, PatternStats>,
582 pub active_indexes: Vec<QueryPattern>,
583 pub total_queries: u64,
584}
585
586#[cfg(test)]
587mod tests {
588 use super::*;
589 use crate::model::NamedNode;
590
591 #[test]
592 fn test_query_pattern_detection() {
593 let s = Subject::NamedNode(NamedNode::new("http://s").expect("valid IRI"));
594 let p = Predicate::NamedNode(NamedNode::new("http://p").expect("valid IRI"));
595 let o = Object::NamedNode(NamedNode::new("http://o").expect("valid IRI"));
596
597 assert_eq!(
598 QueryPattern::from_components(Some(&s), Some(&p), Some(&o)),
599 QueryPattern::SpecificTriple
600 );
601 assert_eq!(
602 QueryPattern::from_components(Some(&s), Some(&p), None),
603 QueryPattern::SubjectPredicate
604 );
605 assert_eq!(
606 QueryPattern::from_components(None, Some(&p), None),
607 QueryPattern::PredicateQuery
608 );
609 assert_eq!(
610 QueryPattern::from_components(None, None, None),
611 QueryPattern::FullScan
612 );
613 }
614
615 #[test]
616 fn test_adaptive_index_creation() {
617 let graph = IndexedGraph::new();
618 let config = AdaptiveConfig {
619 min_queries_for_index: 2,
620 min_frequency_for_index: 0.01,
621 ..Default::default()
622 };
623
624 let manager = AdaptiveIndexManager::new(graph, config);
625
626 for i in 0..10 {
628 let triple = Triple::new(
629 NamedNode::new(format!("http://s{i}")).expect("valid IRI from format"),
630 NamedNode::new("http://p").expect("valid IRI"),
631 NamedNode::new(format!("http://o{i}")).expect("valid IRI from format"),
632 );
633 manager.insert(triple).expect("insert should succeed");
634 }
635
636 let pred = Predicate::NamedNode(NamedNode::new("http://p").expect("valid IRI"));
638 for _ in 0..3 {
639 let results = manager
640 .query(None, Some(&pred), None)
641 .expect("query should succeed");
642 assert_eq!(results.len(), 10);
643 }
644
645 manager.run_maintenance();
647
648 let stats = manager.get_stats();
650 assert!(stats.total_queries >= 3);
651 }
652
653 #[test]
654 fn test_predicate_index() {
655 let graph = Arc::new(RwLock::new(IndexedGraph::new()));
656
657 for i in 0..5 {
659 let triple = Triple::new(
660 NamedNode::new(format!("http://s{i}")).expect("valid IRI from format"),
661 NamedNode::new("http://p1").expect("valid IRI"),
662 NamedNode::new(format!("http://o{i}")).expect("valid IRI from format"),
663 );
664 graph.write().insert(&triple);
665 }
666
667 for i in 0..3 {
668 let triple = Triple::new(
669 NamedNode::new(format!("http://s{i}")).expect("valid IRI from format"),
670 NamedNode::new("http://p2").expect("valid IRI"),
671 NamedNode::new(format!("http://o{i}")).expect("valid IRI from format"),
672 );
673 graph.write().insert(&triple);
674 }
675
676 let index = PredicateIndex::new(graph.clone());
677
678 let p1 = Predicate::NamedNode(NamedNode::new("http://p1").expect("valid IRI"));
680 let results = index
681 .query(None, Some(&p1), None)
682 .expect("index query should succeed");
683 assert_eq!(results.len(), 5);
684
685 let p2 = Predicate::NamedNode(NamedNode::new("http://p2").expect("valid IRI"));
686 let results = index
687 .query(None, Some(&p2), None)
688 .expect("index query should succeed");
689 assert_eq!(results.len(), 3);
690 }
691
692 #[test]
693 fn test_subject_predicate_index() {
694 let graph = Arc::new(RwLock::new(IndexedGraph::new()));
695
696 let s1 = Subject::NamedNode(NamedNode::new("http://s1").expect("valid IRI"));
698 let p1 = Predicate::NamedNode(NamedNode::new("http://p1").expect("valid IRI"));
699
700 for i in 0..5 {
701 let triple = Triple::new(
702 s1.clone(),
703 p1.clone(),
704 Object::NamedNode(
705 NamedNode::new(format!("http://o{i}")).expect("valid IRI from format"),
706 ),
707 );
708 graph.write().insert(&triple);
709 }
710
711 let index = SubjectPredicateIndex::new(graph.clone());
712
713 let results = index
715 .query(Some(&s1), Some(&p1), None)
716 .expect("index query should succeed");
717 assert_eq!(results.len(), 5);
718 }
719}