Skip to main content

oxirs_core/store/
adaptive_index.rs

1//! Adaptive indexing for RDF graphs that automatically adjusts based on query patterns
2//!
3//! This module provides indexes that learn from query patterns and automatically
4//! create, update, or remove indexes to optimize query performance.
5
6use crate::model::{Object, Predicate, Subject, Triple};
7use crate::store::{IndexType, IndexedGraph};
8use crate::OxirsError;
9use dashmap::DashMap;
10use parking_lot::{Mutex, RwLock};
11use std::collections::{HashMap, VecDeque};
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14
15/// Query pattern types for tracking
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
17pub enum QueryPattern {
18    /// Subject-based query (? ? ?)
19    SubjectQuery,
20    /// Predicate-based query (? p ?)
21    PredicateQuery,
22    /// Object-based query (? ? o)
23    ObjectQuery,
24    /// Subject-Predicate query (s p ?)
25    SubjectPredicate,
26    /// Subject-Object query (s ? o)
27    SubjectObject,
28    /// Predicate-Object query (? p o)
29    PredicateObject,
30    /// Specific triple query (s p o)
31    SpecificTriple,
32    /// Full scan query (? ? ?)
33    FullScan,
34}
35
36impl QueryPattern {
37    /// Determine pattern from optional components
38    pub fn from_components(
39        subject: Option<&Subject>,
40        predicate: Option<&Predicate>,
41        object: Option<&Object>,
42    ) -> Self {
43        match (subject.is_some(), predicate.is_some(), object.is_some()) {
44            (true, true, true) => QueryPattern::SpecificTriple,
45            (true, true, false) => QueryPattern::SubjectPredicate,
46            (true, false, true) => QueryPattern::SubjectObject,
47            (false, true, true) => QueryPattern::PredicateObject,
48            (true, false, false) => QueryPattern::SubjectQuery,
49            (false, true, false) => QueryPattern::PredicateQuery,
50            (false, false, true) => QueryPattern::ObjectQuery,
51            (false, false, false) => QueryPattern::FullScan,
52        }
53    }
54
55    /// Get recommended index type for this pattern
56    pub fn recommended_index(&self) -> Option<IndexType> {
57        match self {
58            QueryPattern::SubjectQuery | QueryPattern::SubjectPredicate => Some(IndexType::SPO),
59            QueryPattern::PredicateQuery | QueryPattern::PredicateObject => Some(IndexType::POS),
60            QueryPattern::ObjectQuery | QueryPattern::SubjectObject => Some(IndexType::OSP),
61            QueryPattern::SpecificTriple => Some(IndexType::SPO), // Any index works
62            QueryPattern::FullScan => None,                       // No specific index helps
63        }
64    }
65}
66
67/// Statistics for a specific query pattern
68#[derive(Debug, Clone)]
69pub struct PatternStats {
70    /// Number of times this pattern was queried
71    pub query_count: u64,
72    /// Total time spent on queries of this pattern
73    pub total_time: Duration,
74    /// Average result set size
75    pub avg_result_size: f64,
76    /// Last query timestamp
77    pub last_queried: Instant,
78    /// Moving average of query frequency (queries per second)
79    pub query_frequency: f64,
80}
81
82impl Default for PatternStats {
83    fn default() -> Self {
84        Self {
85            query_count: 0,
86            total_time: Duration::ZERO,
87            avg_result_size: 0.0,
88            last_queried: Instant::now(),
89            query_frequency: 0.0,
90        }
91    }
92}
93
94/// Adaptive index configuration
95#[derive(Debug, Clone)]
96pub struct AdaptiveConfig {
97    /// Minimum queries before considering index creation
98    pub min_queries_for_index: u64,
99    /// Minimum query frequency for index creation (queries/sec)
100    pub min_frequency_for_index: f64,
101    /// Maximum number of adaptive indexes
102    pub max_adaptive_indexes: usize,
103    /// Time window for query pattern analysis
104    pub analysis_window: Duration,
105    /// Index maintenance interval
106    pub maintenance_interval: Duration,
107    /// Cost threshold for index creation (relative to full scan)
108    pub index_cost_threshold: f64,
109}
110
111impl Default for AdaptiveConfig {
112    fn default() -> Self {
113        Self {
114            min_queries_for_index: 100,
115            min_frequency_for_index: 0.1,
116            max_adaptive_indexes: 5,
117            analysis_window: Duration::from_secs(300), // 5 minutes
118            maintenance_interval: Duration::from_secs(60), // 1 minute
119            index_cost_threshold: 0.5,                 // Index if cost < 50% of full scan
120        }
121    }
122}
123
124/// Adaptive index manager
125pub struct AdaptiveIndexManager {
126    /// Base indexed graph
127    base_graph: Arc<RwLock<IndexedGraph>>,
128    /// Query pattern statistics
129    pattern_stats: Arc<DashMap<QueryPattern, PatternStats>>,
130    /// Currently active adaptive indexes
131    adaptive_indexes: Arc<RwLock<HashMap<QueryPattern, Box<dyn AdaptiveIndex>>>>,
132    /// Configuration
133    config: AdaptiveConfig,
134    /// Last maintenance timestamp
135    last_maintenance: Arc<Mutex<Instant>>,
136    /// Query history for pattern analysis
137    query_history: Arc<Mutex<VecDeque<(QueryPattern, Instant, Duration)>>>,
138}
139
140impl AdaptiveIndexManager {
141    /// Create a new adaptive index manager
142    pub fn new(base_graph: IndexedGraph, config: AdaptiveConfig) -> Self {
143        Self {
144            base_graph: Arc::new(RwLock::new(base_graph)),
145            pattern_stats: Arc::new(DashMap::new()),
146            adaptive_indexes: Arc::new(RwLock::new(HashMap::new())),
147            config,
148            last_maintenance: Arc::new(Mutex::new(Instant::now())),
149            query_history: Arc::new(Mutex::new(VecDeque::new())),
150        }
151    }
152
153    /// Execute a query with adaptive indexing
154    pub fn query(
155        &self,
156        subject: Option<&Subject>,
157        predicate: Option<&Predicate>,
158        object: Option<&Object>,
159    ) -> Result<Vec<Triple>, OxirsError> {
160        let start = Instant::now();
161        let pattern = QueryPattern::from_components(subject, predicate, object);
162
163        // Check if we have an adaptive index for this pattern
164        let result = {
165            let indexes = self.adaptive_indexes.read();
166            if let Some(index) = indexes.get(&pattern) {
167                // Use adaptive index
168                index.query(subject, predicate, object)
169            } else {
170                // Fall back to base graph
171                let graph = self.base_graph.read();
172                Ok(graph.match_pattern(subject, predicate, object))
173            }
174        }?;
175
176        let duration = start.elapsed();
177
178        // Update statistics
179        self.update_pattern_stats(pattern, duration, result.len());
180
181        // Record in history
182        {
183            let mut history = self.query_history.lock();
184            history.push_back((pattern, Instant::now(), duration));
185
186            // Keep only recent history
187            let cutoff = Instant::now() - self.config.analysis_window;
188            while let Some((_, timestamp, _)) = history.front() {
189                if *timestamp < cutoff {
190                    history.pop_front();
191                } else {
192                    break;
193                }
194            }
195        }
196
197        // Check if maintenance is needed
198        self.maybe_run_maintenance();
199
200        Ok(result)
201    }
202
203    /// Update statistics for a query pattern
204    fn update_pattern_stats(&self, pattern: QueryPattern, duration: Duration, result_size: usize) {
205        let mut stats = self.pattern_stats.entry(pattern).or_default();
206
207        let now = Instant::now();
208        let time_since_last = now.duration_since(stats.last_queried).as_secs_f64();
209
210        // Update basic stats
211        stats.query_count += 1;
212        stats.total_time += duration;
213        stats.avg_result_size = (stats.avg_result_size * (stats.query_count - 1) as f64
214            + result_size as f64)
215            / stats.query_count as f64;
216
217        // Update frequency with exponential moving average
218        if time_since_last > 0.0 {
219            let instant_frequency = 1.0 / time_since_last;
220            stats.query_frequency = stats.query_frequency * 0.9 + instant_frequency * 0.1;
221        }
222
223        stats.last_queried = now;
224    }
225
226    /// Run maintenance if needed
227    fn maybe_run_maintenance(&self) {
228        let mut last_maintenance = self.last_maintenance.lock();
229        if last_maintenance.elapsed() >= self.config.maintenance_interval {
230            *last_maintenance = Instant::now();
231            drop(last_maintenance);
232
233            // Run maintenance in background
234            let self_clone = self.clone();
235            std::thread::spawn(move || {
236                self_clone.run_maintenance_internal();
237            });
238        }
239    }
240
241    /// Run index maintenance (public for testing only)
242    ///
243    /// # Note
244    /// This method is only intended for use in tests to trigger maintenance
245    /// manually. In production, maintenance runs automatically based on the
246    /// configured maintenance interval.
247    pub fn run_maintenance(&self) {
248        self.run_maintenance_internal();
249    }
250
251    /// Run index maintenance (internal)
252    fn run_maintenance_internal(&self) {
253        // Analyze patterns and create/remove indexes
254        let patterns_to_index = self.analyze_patterns();
255
256        // Create new indexes
257        for pattern in patterns_to_index {
258            self.create_adaptive_index(pattern);
259        }
260
261        // Remove underused indexes
262        self.cleanup_indexes();
263    }
264
265    /// Analyze query patterns to determine which indexes to create
266    fn analyze_patterns(&self) -> Vec<QueryPattern> {
267        let mut candidates = Vec::new();
268
269        for entry in self.pattern_stats.iter() {
270            let (pattern, stats) = entry.pair();
271
272            // Skip if already indexed
273            if self.adaptive_indexes.read().contains_key(pattern) {
274                continue;
275            }
276
277            // Check if pattern qualifies for indexing
278            if stats.query_count >= self.config.min_queries_for_index
279                && stats.query_frequency >= self.config.min_frequency_for_index
280            {
281                // Estimate cost benefit
282                if let Some(benefit) = self.estimate_index_benefit(*pattern, stats) {
283                    if benefit >= self.config.index_cost_threshold {
284                        candidates.push((*pattern, benefit));
285                    }
286                }
287            }
288        }
289
290        // Sort by benefit and take top candidates
291        candidates.sort_by(|a, b| {
292            b.1.partial_cmp(&a.1)
293                .expect("benefit scores should be finite")
294        });
295        candidates.truncate(self.config.max_adaptive_indexes);
296
297        candidates.into_iter().map(|(pattern, _)| pattern).collect()
298    }
299
300    /// Estimate the benefit of creating an index for a pattern
301    fn estimate_index_benefit(&self, _pattern: QueryPattern, stats: &PatternStats) -> Option<f64> {
302        // Simple cost model: benefit = (scan_cost - index_cost) / scan_cost
303        let graph = self.base_graph.read();
304        let total_triples = graph.len() as f64;
305
306        // Estimate scan cost (proportional to total triples)
307        let scan_cost = total_triples;
308
309        // Estimate index cost (proportional to result size)
310        let index_cost = stats.avg_result_size;
311
312        if scan_cost > 0.0 {
313            Some((scan_cost - index_cost) / scan_cost)
314        } else {
315            None
316        }
317    }
318
319    /// Create an adaptive index for a pattern
320    fn create_adaptive_index(&self, pattern: QueryPattern) {
321        let mut indexes = self.adaptive_indexes.write();
322
323        // Check capacity
324        if indexes.len() >= self.config.max_adaptive_indexes {
325            return;
326        }
327
328        // Create appropriate index type
329        let index: Box<dyn AdaptiveIndex> = match pattern {
330            QueryPattern::PredicateQuery => Box::new(PredicateIndex::new(self.base_graph.clone())),
331            QueryPattern::SubjectPredicate => {
332                Box::new(SubjectPredicateIndex::new(self.base_graph.clone()))
333            }
334            _ => return, // Add more index types as needed
335        };
336
337        indexes.insert(pattern, index);
338    }
339
340    /// Remove underused indexes
341    fn cleanup_indexes(&self) {
342        let mut indexes = self.adaptive_indexes.write();
343        let stats = self.pattern_stats.clone();
344
345        indexes.retain(|pattern, _| {
346            match stats.get(pattern) {
347                Some(pattern_stats) => {
348                    // Keep if still meeting frequency threshold
349                    pattern_stats.query_frequency >= self.config.min_frequency_for_index * 0.5
350                }
351                _ => false,
352            }
353        });
354    }
355
356    /// Get current statistics
357    pub fn get_stats(&self) -> AdaptiveIndexStats {
358        let pattern_stats: HashMap<QueryPattern, PatternStats> = self
359            .pattern_stats
360            .iter()
361            .map(|entry| (*entry.key(), entry.value().clone()))
362            .collect();
363
364        let active_indexes: Vec<QueryPattern> =
365            self.adaptive_indexes.read().keys().copied().collect();
366
367        let total_queries = pattern_stats.values().map(|s| s.query_count).sum();
368
369        AdaptiveIndexStats {
370            pattern_stats,
371            active_indexes,
372            total_queries,
373        }
374    }
375
376    /// Insert a triple and update indexes
377    pub fn insert(&self, triple: Triple) -> Result<bool, OxirsError> {
378        // Insert into base graph
379        let inserted = self.base_graph.write().insert(&triple);
380
381        if inserted {
382            // Update adaptive indexes
383            let indexes = self.adaptive_indexes.read();
384            for index in indexes.values() {
385                index.insert(&triple)?;
386            }
387        }
388
389        Ok(inserted)
390    }
391
392    /// Remove a triple and update indexes
393    pub fn remove(&self, triple: &Triple) -> Result<bool, OxirsError> {
394        // Remove from base graph
395        let removed = self.base_graph.write().remove(triple);
396
397        if removed {
398            // Update adaptive indexes
399            let indexes = self.adaptive_indexes.read();
400            for index in indexes.values() {
401                index.remove(triple)?;
402            }
403        }
404
405        Ok(removed)
406    }
407}
408
409// Implement Clone manually to avoid trait object issues
410impl Clone for AdaptiveIndexManager {
411    fn clone(&self) -> Self {
412        Self {
413            base_graph: self.base_graph.clone(),
414            pattern_stats: self.pattern_stats.clone(),
415            adaptive_indexes: self.adaptive_indexes.clone(),
416            config: self.config.clone(),
417            last_maintenance: Arc::new(Mutex::new(*self.last_maintenance.lock())),
418            query_history: self.query_history.clone(),
419        }
420    }
421}
422
423/// Trait for adaptive index implementations
424trait AdaptiveIndex: Send + Sync {
425    /// Query the index
426    fn query(
427        &self,
428        subject: Option<&Subject>,
429        predicate: Option<&Predicate>,
430        object: Option<&Object>,
431    ) -> Result<Vec<Triple>, OxirsError>;
432
433    /// Insert a triple
434    fn insert(&self, triple: &Triple) -> Result<(), OxirsError>;
435
436    /// Remove a triple
437    fn remove(&self, triple: &Triple) -> Result<(), OxirsError>;
438}
439
440/// Predicate-based adaptive index
441struct PredicateIndex {
442    base_graph: Arc<RwLock<IndexedGraph>>,
443    predicate_map: Arc<DashMap<Predicate, Vec<Triple>>>,
444}
445
446impl PredicateIndex {
447    fn new(base_graph: Arc<RwLock<IndexedGraph>>) -> Self {
448        let index = Self {
449            base_graph: base_graph.clone(),
450            predicate_map: Arc::new(DashMap::new()),
451        };
452
453        // Build initial index
454        let graph = base_graph.read();
455        for triple in graph.iter() {
456            index
457                .predicate_map
458                .entry(triple.predicate().clone())
459                .or_default()
460                .push(triple);
461        }
462
463        index
464    }
465}
466
467impl AdaptiveIndex for PredicateIndex {
468    fn query(
469        &self,
470        subject: Option<&Subject>,
471        predicate: Option<&Predicate>,
472        object: Option<&Object>,
473    ) -> Result<Vec<Triple>, OxirsError> {
474        if let Some(pred) = predicate {
475            if let Some(triples) = self.predicate_map.get(pred) {
476                let results: Vec<Triple> = triples
477                    .iter()
478                    .filter(|t| {
479                        subject.map_or(true, |s| t.subject() == s)
480                            && object.map_or(true, |o| t.object() == o)
481                    })
482                    .cloned()
483                    .collect();
484                return Ok(results);
485            }
486        }
487
488        // Fall back to base graph
489        let graph = self.base_graph.read();
490        Ok(graph.match_pattern(subject, predicate, object))
491    }
492
493    fn insert(&self, triple: &Triple) -> Result<(), OxirsError> {
494        self.predicate_map
495            .entry(triple.predicate().clone())
496            .or_default()
497            .push(triple.clone());
498        Ok(())
499    }
500
501    fn remove(&self, triple: &Triple) -> Result<(), OxirsError> {
502        if let Some(mut triples) = self.predicate_map.get_mut(triple.predicate()) {
503            triples.retain(|t| t != triple);
504        }
505        Ok(())
506    }
507}
508
509/// Subject-Predicate composite index
510struct SubjectPredicateIndex {
511    base_graph: Arc<RwLock<IndexedGraph>>,
512    sp_map: Arc<DashMap<(Subject, Predicate), Vec<Object>>>,
513}
514
515impl SubjectPredicateIndex {
516    fn new(base_graph: Arc<RwLock<IndexedGraph>>) -> Self {
517        let index = Self {
518            base_graph: base_graph.clone(),
519            sp_map: Arc::new(DashMap::new()),
520        };
521
522        // Build initial index
523        let graph = base_graph.read();
524        for triple in graph.iter() {
525            index
526                .sp_map
527                .entry((triple.subject().clone(), triple.predicate().clone()))
528                .or_default()
529                .push(triple.object().clone());
530        }
531
532        index
533    }
534}
535
536impl AdaptiveIndex for SubjectPredicateIndex {
537    fn query(
538        &self,
539        subject: Option<&Subject>,
540        predicate: Option<&Predicate>,
541        object: Option<&Object>,
542    ) -> Result<Vec<Triple>, OxirsError> {
543        if let (Some(subj), Some(pred)) = (subject, predicate) {
544            if let Some(objects) = self.sp_map.get(&(subj.clone(), pred.clone())) {
545                let results: Vec<Triple> = objects
546                    .iter()
547                    .filter(|o| object.map_or(true, |obj| *o == obj))
548                    .map(|o| Triple::new(subj.clone(), pred.clone(), o.clone()))
549                    .collect();
550                return Ok(results);
551            }
552        }
553
554        // Fall back to base graph
555        let graph = self.base_graph.read();
556        Ok(graph.match_pattern(subject, predicate, object))
557    }
558
559    fn insert(&self, triple: &Triple) -> Result<(), OxirsError> {
560        self.sp_map
561            .entry((triple.subject().clone(), triple.predicate().clone()))
562            .or_default()
563            .push(triple.object().clone());
564        Ok(())
565    }
566
567    fn remove(&self, triple: &Triple) -> Result<(), OxirsError> {
568        if let Some(mut objects) = self
569            .sp_map
570            .get_mut(&(triple.subject().clone(), triple.predicate().clone()))
571        {
572            objects.retain(|o| o != triple.object());
573        }
574        Ok(())
575    }
576}
577
578/// Statistics for adaptive indexing
579#[derive(Debug, Clone)]
580pub struct AdaptiveIndexStats {
581    pub pattern_stats: HashMap<QueryPattern, PatternStats>,
582    pub active_indexes: Vec<QueryPattern>,
583    pub total_queries: u64,
584}
585
586#[cfg(test)]
587mod tests {
588    use super::*;
589    use crate::model::NamedNode;
590
591    #[test]
592    fn test_query_pattern_detection() {
593        let s = Subject::NamedNode(NamedNode::new("http://s").expect("valid IRI"));
594        let p = Predicate::NamedNode(NamedNode::new("http://p").expect("valid IRI"));
595        let o = Object::NamedNode(NamedNode::new("http://o").expect("valid IRI"));
596
597        assert_eq!(
598            QueryPattern::from_components(Some(&s), Some(&p), Some(&o)),
599            QueryPattern::SpecificTriple
600        );
601        assert_eq!(
602            QueryPattern::from_components(Some(&s), Some(&p), None),
603            QueryPattern::SubjectPredicate
604        );
605        assert_eq!(
606            QueryPattern::from_components(None, Some(&p), None),
607            QueryPattern::PredicateQuery
608        );
609        assert_eq!(
610            QueryPattern::from_components(None, None, None),
611            QueryPattern::FullScan
612        );
613    }
614
615    #[test]
616    fn test_adaptive_index_creation() {
617        let graph = IndexedGraph::new();
618        let config = AdaptiveConfig {
619            min_queries_for_index: 2,
620            min_frequency_for_index: 0.01,
621            ..Default::default()
622        };
623
624        let manager = AdaptiveIndexManager::new(graph, config);
625
626        // Insert test data
627        for i in 0..10 {
628            let triple = Triple::new(
629                NamedNode::new(format!("http://s{i}")).expect("valid IRI from format"),
630                NamedNode::new("http://p").expect("valid IRI"),
631                NamedNode::new(format!("http://o{i}")).expect("valid IRI from format"),
632            );
633            manager.insert(triple).expect("insert should succeed");
634        }
635
636        // Query the same pattern multiple times
637        let pred = Predicate::NamedNode(NamedNode::new("http://p").expect("valid IRI"));
638        for _ in 0..3 {
639            let results = manager
640                .query(None, Some(&pred), None)
641                .expect("query should succeed");
642            assert_eq!(results.len(), 10);
643        }
644
645        // Force maintenance
646        manager.run_maintenance();
647
648        // Check if index was created
649        let stats = manager.get_stats();
650        assert!(stats.total_queries >= 3);
651    }
652
653    #[test]
654    fn test_predicate_index() {
655        let graph = Arc::new(RwLock::new(IndexedGraph::new()));
656
657        // Insert test data
658        for i in 0..5 {
659            let triple = Triple::new(
660                NamedNode::new(format!("http://s{i}")).expect("valid IRI from format"),
661                NamedNode::new("http://p1").expect("valid IRI"),
662                NamedNode::new(format!("http://o{i}")).expect("valid IRI from format"),
663            );
664            graph.write().insert(&triple);
665        }
666
667        for i in 0..3 {
668            let triple = Triple::new(
669                NamedNode::new(format!("http://s{i}")).expect("valid IRI from format"),
670                NamedNode::new("http://p2").expect("valid IRI"),
671                NamedNode::new(format!("http://o{i}")).expect("valid IRI from format"),
672            );
673            graph.write().insert(&triple);
674        }
675
676        let index = PredicateIndex::new(graph.clone());
677
678        // Query by predicate
679        let p1 = Predicate::NamedNode(NamedNode::new("http://p1").expect("valid IRI"));
680        let results = index
681            .query(None, Some(&p1), None)
682            .expect("index query should succeed");
683        assert_eq!(results.len(), 5);
684
685        let p2 = Predicate::NamedNode(NamedNode::new("http://p2").expect("valid IRI"));
686        let results = index
687            .query(None, Some(&p2), None)
688            .expect("index query should succeed");
689        assert_eq!(results.len(), 3);
690    }
691
692    #[test]
693    fn test_subject_predicate_index() {
694        let graph = Arc::new(RwLock::new(IndexedGraph::new()));
695
696        // Insert test data
697        let s1 = Subject::NamedNode(NamedNode::new("http://s1").expect("valid IRI"));
698        let p1 = Predicate::NamedNode(NamedNode::new("http://p1").expect("valid IRI"));
699
700        for i in 0..5 {
701            let triple = Triple::new(
702                s1.clone(),
703                p1.clone(),
704                Object::NamedNode(
705                    NamedNode::new(format!("http://o{i}")).expect("valid IRI from format"),
706                ),
707            );
708            graph.write().insert(&triple);
709        }
710
711        let index = SubjectPredicateIndex::new(graph.clone());
712
713        // Query by subject and predicate
714        let results = index
715            .query(Some(&s1), Some(&p1), None)
716            .expect("index query should succeed");
717        assert_eq!(results.len(), 5);
718    }
719}