use crate::model::{Object, Predicate, Subject, Triple};
use crate::store::{IndexType, IndexedGraph};
use crate::OxirsError;
use dashmap::DashMap;
use parking_lot::{Mutex, RwLock};
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::time::{Duration, Instant};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum QueryPattern {
SubjectQuery,
PredicateQuery,
ObjectQuery,
SubjectPredicate,
SubjectObject,
PredicateObject,
SpecificTriple,
FullScan,
}
impl QueryPattern {
pub fn from_components(
subject: Option<&Subject>,
predicate: Option<&Predicate>,
object: Option<&Object>,
) -> Self {
match (subject.is_some(), predicate.is_some(), object.is_some()) {
(true, true, true) => QueryPattern::SpecificTriple,
(true, true, false) => QueryPattern::SubjectPredicate,
(true, false, true) => QueryPattern::SubjectObject,
(false, true, true) => QueryPattern::PredicateObject,
(true, false, false) => QueryPattern::SubjectQuery,
(false, true, false) => QueryPattern::PredicateQuery,
(false, false, true) => QueryPattern::ObjectQuery,
(false, false, false) => QueryPattern::FullScan,
}
}
pub fn recommended_index(&self) -> Option<IndexType> {
match self {
QueryPattern::SubjectQuery | QueryPattern::SubjectPredicate => Some(IndexType::SPO),
QueryPattern::PredicateQuery | QueryPattern::PredicateObject => Some(IndexType::POS),
QueryPattern::ObjectQuery | QueryPattern::SubjectObject => Some(IndexType::OSP),
QueryPattern::SpecificTriple => Some(IndexType::SPO), QueryPattern::FullScan => None, }
}
}
#[derive(Debug, Clone)]
pub struct PatternStats {
pub query_count: u64,
pub total_time: Duration,
pub avg_result_size: f64,
pub last_queried: Instant,
pub query_frequency: f64,
}
impl Default for PatternStats {
fn default() -> Self {
Self {
query_count: 0,
total_time: Duration::ZERO,
avg_result_size: 0.0,
last_queried: Instant::now(),
query_frequency: 0.0,
}
}
}
#[derive(Debug, Clone)]
pub struct AdaptiveConfig {
pub min_queries_for_index: u64,
pub min_frequency_for_index: f64,
pub max_adaptive_indexes: usize,
pub analysis_window: Duration,
pub maintenance_interval: Duration,
pub index_cost_threshold: f64,
}
impl Default for AdaptiveConfig {
fn default() -> Self {
Self {
min_queries_for_index: 100,
min_frequency_for_index: 0.1,
max_adaptive_indexes: 5,
analysis_window: Duration::from_secs(300), maintenance_interval: Duration::from_secs(60), index_cost_threshold: 0.5, }
}
}
pub struct AdaptiveIndexManager {
base_graph: Arc<RwLock<IndexedGraph>>,
pattern_stats: Arc<DashMap<QueryPattern, PatternStats>>,
adaptive_indexes: Arc<RwLock<HashMap<QueryPattern, Box<dyn AdaptiveIndex>>>>,
config: AdaptiveConfig,
last_maintenance: Arc<Mutex<Instant>>,
query_history: Arc<Mutex<VecDeque<(QueryPattern, Instant, Duration)>>>,
}
impl AdaptiveIndexManager {
pub fn new(base_graph: IndexedGraph, config: AdaptiveConfig) -> Self {
Self {
base_graph: Arc::new(RwLock::new(base_graph)),
pattern_stats: Arc::new(DashMap::new()),
adaptive_indexes: Arc::new(RwLock::new(HashMap::new())),
config,
last_maintenance: Arc::new(Mutex::new(Instant::now())),
query_history: Arc::new(Mutex::new(VecDeque::new())),
}
}
pub fn query(
&self,
subject: Option<&Subject>,
predicate: Option<&Predicate>,
object: Option<&Object>,
) -> Result<Vec<Triple>, OxirsError> {
let start = Instant::now();
let pattern = QueryPattern::from_components(subject, predicate, object);
let result = {
let indexes = self.adaptive_indexes.read();
if let Some(index) = indexes.get(&pattern) {
index.query(subject, predicate, object)
} else {
let graph = self.base_graph.read();
Ok(graph.match_pattern(subject, predicate, object))
}
}?;
let duration = start.elapsed();
self.update_pattern_stats(pattern, duration, result.len());
{
let mut history = self.query_history.lock();
history.push_back((pattern, Instant::now(), duration));
let cutoff = Instant::now() - self.config.analysis_window;
while let Some((_, timestamp, _)) = history.front() {
if *timestamp < cutoff {
history.pop_front();
} else {
break;
}
}
}
self.maybe_run_maintenance();
Ok(result)
}
fn update_pattern_stats(&self, pattern: QueryPattern, duration: Duration, result_size: usize) {
let mut stats = self.pattern_stats.entry(pattern).or_default();
let now = Instant::now();
let time_since_last = now.duration_since(stats.last_queried).as_secs_f64();
stats.query_count += 1;
stats.total_time += duration;
stats.avg_result_size = (stats.avg_result_size * (stats.query_count - 1) as f64
+ result_size as f64)
/ stats.query_count as f64;
if time_since_last > 0.0 {
let instant_frequency = 1.0 / time_since_last;
stats.query_frequency = stats.query_frequency * 0.9 + instant_frequency * 0.1;
}
stats.last_queried = now;
}
fn maybe_run_maintenance(&self) {
let mut last_maintenance = self.last_maintenance.lock();
if last_maintenance.elapsed() >= self.config.maintenance_interval {
*last_maintenance = Instant::now();
drop(last_maintenance);
let self_clone = self.clone();
std::thread::spawn(move || {
self_clone.run_maintenance_internal();
});
}
}
pub fn run_maintenance(&self) {
self.run_maintenance_internal();
}
fn run_maintenance_internal(&self) {
let patterns_to_index = self.analyze_patterns();
for pattern in patterns_to_index {
self.create_adaptive_index(pattern);
}
self.cleanup_indexes();
}
fn analyze_patterns(&self) -> Vec<QueryPattern> {
let mut candidates = Vec::new();
for entry in self.pattern_stats.iter() {
let (pattern, stats) = entry.pair();
if self.adaptive_indexes.read().contains_key(pattern) {
continue;
}
if stats.query_count >= self.config.min_queries_for_index
&& stats.query_frequency >= self.config.min_frequency_for_index
{
if let Some(benefit) = self.estimate_index_benefit(*pattern, stats) {
if benefit >= self.config.index_cost_threshold {
candidates.push((*pattern, benefit));
}
}
}
}
candidates.sort_by(|a, b| {
b.1.partial_cmp(&a.1)
.expect("benefit scores should be finite")
});
candidates.truncate(self.config.max_adaptive_indexes);
candidates.into_iter().map(|(pattern, _)| pattern).collect()
}
fn estimate_index_benefit(&self, _pattern: QueryPattern, stats: &PatternStats) -> Option<f64> {
let graph = self.base_graph.read();
let total_triples = graph.len() as f64;
let scan_cost = total_triples;
let index_cost = stats.avg_result_size;
if scan_cost > 0.0 {
Some((scan_cost - index_cost) / scan_cost)
} else {
None
}
}
fn create_adaptive_index(&self, pattern: QueryPattern) {
let mut indexes = self.adaptive_indexes.write();
if indexes.len() >= self.config.max_adaptive_indexes {
return;
}
let index: Box<dyn AdaptiveIndex> = match pattern {
QueryPattern::PredicateQuery => Box::new(PredicateIndex::new(self.base_graph.clone())),
QueryPattern::SubjectPredicate => {
Box::new(SubjectPredicateIndex::new(self.base_graph.clone()))
}
_ => return, };
indexes.insert(pattern, index);
}
fn cleanup_indexes(&self) {
let mut indexes = self.adaptive_indexes.write();
let stats = self.pattern_stats.clone();
indexes.retain(|pattern, _| {
match stats.get(pattern) {
Some(pattern_stats) => {
pattern_stats.query_frequency >= self.config.min_frequency_for_index * 0.5
}
_ => false,
}
});
}
pub fn get_stats(&self) -> AdaptiveIndexStats {
let pattern_stats: HashMap<QueryPattern, PatternStats> = self
.pattern_stats
.iter()
.map(|entry| (*entry.key(), entry.value().clone()))
.collect();
let active_indexes: Vec<QueryPattern> =
self.adaptive_indexes.read().keys().copied().collect();
let total_queries = pattern_stats.values().map(|s| s.query_count).sum();
AdaptiveIndexStats {
pattern_stats,
active_indexes,
total_queries,
}
}
pub fn insert(&self, triple: Triple) -> Result<bool, OxirsError> {
let inserted = self.base_graph.write().insert(&triple);
if inserted {
let indexes = self.adaptive_indexes.read();
for index in indexes.values() {
index.insert(&triple)?;
}
}
Ok(inserted)
}
pub fn remove(&self, triple: &Triple) -> Result<bool, OxirsError> {
let removed = self.base_graph.write().remove(triple);
if removed {
let indexes = self.adaptive_indexes.read();
for index in indexes.values() {
index.remove(triple)?;
}
}
Ok(removed)
}
}
impl Clone for AdaptiveIndexManager {
fn clone(&self) -> Self {
Self {
base_graph: self.base_graph.clone(),
pattern_stats: self.pattern_stats.clone(),
adaptive_indexes: self.adaptive_indexes.clone(),
config: self.config.clone(),
last_maintenance: Arc::new(Mutex::new(*self.last_maintenance.lock())),
query_history: self.query_history.clone(),
}
}
}
trait AdaptiveIndex: Send + Sync {
fn query(
&self,
subject: Option<&Subject>,
predicate: Option<&Predicate>,
object: Option<&Object>,
) -> Result<Vec<Triple>, OxirsError>;
fn insert(&self, triple: &Triple) -> Result<(), OxirsError>;
fn remove(&self, triple: &Triple) -> Result<(), OxirsError>;
}
struct PredicateIndex {
base_graph: Arc<RwLock<IndexedGraph>>,
predicate_map: Arc<DashMap<Predicate, Vec<Triple>>>,
}
impl PredicateIndex {
fn new(base_graph: Arc<RwLock<IndexedGraph>>) -> Self {
let index = Self {
base_graph: base_graph.clone(),
predicate_map: Arc::new(DashMap::new()),
};
let graph = base_graph.read();
for triple in graph.iter() {
index
.predicate_map
.entry(triple.predicate().clone())
.or_default()
.push(triple);
}
index
}
}
impl AdaptiveIndex for PredicateIndex {
fn query(
&self,
subject: Option<&Subject>,
predicate: Option<&Predicate>,
object: Option<&Object>,
) -> Result<Vec<Triple>, OxirsError> {
if let Some(pred) = predicate {
if let Some(triples) = self.predicate_map.get(pred) {
let results: Vec<Triple> = triples
.iter()
.filter(|t| {
subject.map_or(true, |s| t.subject() == s)
&& object.map_or(true, |o| t.object() == o)
})
.cloned()
.collect();
return Ok(results);
}
}
let graph = self.base_graph.read();
Ok(graph.match_pattern(subject, predicate, object))
}
fn insert(&self, triple: &Triple) -> Result<(), OxirsError> {
self.predicate_map
.entry(triple.predicate().clone())
.or_default()
.push(triple.clone());
Ok(())
}
fn remove(&self, triple: &Triple) -> Result<(), OxirsError> {
if let Some(mut triples) = self.predicate_map.get_mut(triple.predicate()) {
triples.retain(|t| t != triple);
}
Ok(())
}
}
struct SubjectPredicateIndex {
base_graph: Arc<RwLock<IndexedGraph>>,
sp_map: Arc<DashMap<(Subject, Predicate), Vec<Object>>>,
}
impl SubjectPredicateIndex {
fn new(base_graph: Arc<RwLock<IndexedGraph>>) -> Self {
let index = Self {
base_graph: base_graph.clone(),
sp_map: Arc::new(DashMap::new()),
};
let graph = base_graph.read();
for triple in graph.iter() {
index
.sp_map
.entry((triple.subject().clone(), triple.predicate().clone()))
.or_default()
.push(triple.object().clone());
}
index
}
}
impl AdaptiveIndex for SubjectPredicateIndex {
fn query(
&self,
subject: Option<&Subject>,
predicate: Option<&Predicate>,
object: Option<&Object>,
) -> Result<Vec<Triple>, OxirsError> {
if let (Some(subj), Some(pred)) = (subject, predicate) {
if let Some(objects) = self.sp_map.get(&(subj.clone(), pred.clone())) {
let results: Vec<Triple> = objects
.iter()
.filter(|o| object.map_or(true, |obj| *o == obj))
.map(|o| Triple::new(subj.clone(), pred.clone(), o.clone()))
.collect();
return Ok(results);
}
}
let graph = self.base_graph.read();
Ok(graph.match_pattern(subject, predicate, object))
}
fn insert(&self, triple: &Triple) -> Result<(), OxirsError> {
self.sp_map
.entry((triple.subject().clone(), triple.predicate().clone()))
.or_default()
.push(triple.object().clone());
Ok(())
}
fn remove(&self, triple: &Triple) -> Result<(), OxirsError> {
if let Some(mut objects) = self
.sp_map
.get_mut(&(triple.subject().clone(), triple.predicate().clone()))
{
objects.retain(|o| o != triple.object());
}
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct AdaptiveIndexStats {
pub pattern_stats: HashMap<QueryPattern, PatternStats>,
pub active_indexes: Vec<QueryPattern>,
pub total_queries: u64,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::model::NamedNode;
#[test]
fn test_query_pattern_detection() {
let s = Subject::NamedNode(NamedNode::new("http://s").expect("valid IRI"));
let p = Predicate::NamedNode(NamedNode::new("http://p").expect("valid IRI"));
let o = Object::NamedNode(NamedNode::new("http://o").expect("valid IRI"));
assert_eq!(
QueryPattern::from_components(Some(&s), Some(&p), Some(&o)),
QueryPattern::SpecificTriple
);
assert_eq!(
QueryPattern::from_components(Some(&s), Some(&p), None),
QueryPattern::SubjectPredicate
);
assert_eq!(
QueryPattern::from_components(None, Some(&p), None),
QueryPattern::PredicateQuery
);
assert_eq!(
QueryPattern::from_components(None, None, None),
QueryPattern::FullScan
);
}
#[test]
fn test_adaptive_index_creation() {
let graph = IndexedGraph::new();
let config = AdaptiveConfig {
min_queries_for_index: 2,
min_frequency_for_index: 0.01,
..Default::default()
};
let manager = AdaptiveIndexManager::new(graph, config);
for i in 0..10 {
let triple = Triple::new(
NamedNode::new(format!("http://s{i}")).expect("valid IRI from format"),
NamedNode::new("http://p").expect("valid IRI"),
NamedNode::new(format!("http://o{i}")).expect("valid IRI from format"),
);
manager.insert(triple).expect("insert should succeed");
}
let pred = Predicate::NamedNode(NamedNode::new("http://p").expect("valid IRI"));
for _ in 0..3 {
let results = manager
.query(None, Some(&pred), None)
.expect("query should succeed");
assert_eq!(results.len(), 10);
}
manager.run_maintenance();
let stats = manager.get_stats();
assert!(stats.total_queries >= 3);
}
#[test]
fn test_predicate_index() {
let graph = Arc::new(RwLock::new(IndexedGraph::new()));
for i in 0..5 {
let triple = Triple::new(
NamedNode::new(format!("http://s{i}")).expect("valid IRI from format"),
NamedNode::new("http://p1").expect("valid IRI"),
NamedNode::new(format!("http://o{i}")).expect("valid IRI from format"),
);
graph.write().insert(&triple);
}
for i in 0..3 {
let triple = Triple::new(
NamedNode::new(format!("http://s{i}")).expect("valid IRI from format"),
NamedNode::new("http://p2").expect("valid IRI"),
NamedNode::new(format!("http://o{i}")).expect("valid IRI from format"),
);
graph.write().insert(&triple);
}
let index = PredicateIndex::new(graph.clone());
let p1 = Predicate::NamedNode(NamedNode::new("http://p1").expect("valid IRI"));
let results = index
.query(None, Some(&p1), None)
.expect("index query should succeed");
assert_eq!(results.len(), 5);
let p2 = Predicate::NamedNode(NamedNode::new("http://p2").expect("valid IRI"));
let results = index
.query(None, Some(&p2), None)
.expect("index query should succeed");
assert_eq!(results.len(), 3);
}
#[test]
fn test_subject_predicate_index() {
let graph = Arc::new(RwLock::new(IndexedGraph::new()));
let s1 = Subject::NamedNode(NamedNode::new("http://s1").expect("valid IRI"));
let p1 = Predicate::NamedNode(NamedNode::new("http://p1").expect("valid IRI"));
for i in 0..5 {
let triple = Triple::new(
s1.clone(),
p1.clone(),
Object::NamedNode(
NamedNode::new(format!("http://o{i}")).expect("valid IRI from format"),
),
);
graph.write().insert(&triple);
}
let index = SubjectPredicateIndex::new(graph.clone());
let results = index
.query(Some(&s1), Some(&p1), None)
.expect("index query should succeed");
assert_eq!(results.len(), 5);
}
}