use crate::model::*;
use ahash::RandomState;
use bumpalo::Bump;
use dashmap::DashMap;
use parking_lot::RwLock;
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum QueryPattern {
FullScan,
SubjectOnly,
PredicateOnly,
ObjectOnly,
GraphOnly,
SubjectPredicate,
SubjectObject,
PredicateObject,
SubjectPredicateObject,
FullMatch,
}
#[derive(Debug, Clone)]
pub struct MemoryUsage {
pub total_bytes: usize,
pub heap_bytes: usize,
pub index_bytes: usize,
pub arena_bytes: usize,
}
#[derive(Debug)]
pub struct UltraIndex {
subject_index: DashMap<Subject, BTreeSet<u64>, RandomState>,
predicate_index: DashMap<Predicate, BTreeSet<u64>, RandomState>,
object_index: DashMap<Object, BTreeSet<u64>, RandomState>,
graph_index: DashMap<GraphName, BTreeSet<u64>, RandomState>,
quad_storage: DashMap<u64, Quad, RandomState>,
next_id: AtomicUsize,
arena: Arc<std::sync::Mutex<Bump>>,
stats: Arc<IndexStats>,
}
impl Default for UltraIndex {
fn default() -> Self {
Self::new()
}
}
impl UltraIndex {
pub fn new() -> Self {
Self {
subject_index: DashMap::default(),
predicate_index: DashMap::default(),
object_index: DashMap::default(),
graph_index: DashMap::default(),
quad_storage: DashMap::default(),
next_id: AtomicUsize::new(0),
arena: Arc::new(std::sync::Mutex::new(Bump::new())),
stats: Arc::new(IndexStats::default()),
}
}
pub fn insert_quad(&self, quad: &Quad) -> u64 {
let id = self.next_id.fetch_add(1, Ordering::Relaxed) as u64;
self.quad_storage.insert(id, quad.clone());
self.subject_index
.entry(quad.subject().clone())
.or_default()
.insert(id);
self.predicate_index
.entry(quad.predicate().clone())
.or_default()
.insert(id);
self.object_index
.entry(quad.object().clone())
.or_default()
.insert(id);
self.graph_index
.entry(quad.graph_name().clone())
.or_default()
.insert(id);
self.stats.insertions.fetch_add(1, Ordering::Relaxed);
id
}
pub fn bulk_insert_quads(&self, quads: &[Quad]) -> Vec<u64> {
quads.iter().map(|quad| self.insert_quad(quad)).collect()
}
pub fn find_quads(
&self,
subject: Option<&Subject>,
predicate: Option<&Predicate>,
object: Option<&Object>,
graph_name: Option<&GraphName>,
) -> Vec<Quad> {
self.stats.lookups.fetch_add(1, Ordering::Relaxed);
let mut result_ids: Option<HashSet<u64>> = None;
if let Some(s) = subject {
match self.subject_index.get(s) {
Some(ids_set) => {
let ids: HashSet<u64> = ids_set.iter().cloned().collect();
result_ids = Some(match result_ids {
Some(existing) => existing.intersection(&ids).cloned().collect(),
None => ids,
});
}
_ => {
return Vec::new(); }
}
}
if let Some(p) = predicate {
match self.predicate_index.get(p) {
Some(ids_set) => {
let ids: HashSet<u64> = ids_set.iter().cloned().collect();
result_ids = Some(match result_ids {
Some(existing) => existing.intersection(&ids).cloned().collect(),
None => ids,
});
}
_ => {
return Vec::new(); }
}
}
if let Some(o) = object {
match self.object_index.get(o) {
Some(ids_set) => {
let ids: HashSet<u64> = ids_set.iter().cloned().collect();
result_ids = Some(match result_ids {
Some(existing) => existing.intersection(&ids).cloned().collect(),
None => ids,
});
}
_ => {
return Vec::new(); }
}
}
if let Some(g) = graph_name {
match self.graph_index.get(g) {
Some(ids_set) => {
let ids: HashSet<u64> = ids_set.iter().cloned().collect();
result_ids = Some(match result_ids {
Some(existing) => existing.intersection(&ids).cloned().collect(),
None => ids,
});
}
_ => {
return Vec::new(); }
}
}
let final_ids = result_ids
.unwrap_or_else(|| self.quad_storage.iter().map(|entry| *entry.key()).collect());
let mut results = Vec::new();
for id in final_ids {
if let Some(quad) = self.quad_storage.get(&id) {
results.push(quad.clone());
}
}
results
}
pub fn len(&self) -> usize {
self.quad_storage.len()
}
pub fn is_empty(&self) -> bool {
self.quad_storage.is_empty()
}
pub fn stats(&self) -> Arc<IndexStats> {
Arc::clone(&self.stats)
}
pub fn memory_usage(&self) -> MemoryUsage {
let arena_usage = {
match self.arena.lock() {
Ok(arena) => arena.allocated_bytes(),
_ => 0,
}
};
let index_usage = self.subject_index.len() * 64
+ self.predicate_index.len() * 64
+ self.object_index.len() * 64
+ self.graph_index.len() * 64
+ self.quad_storage.len() * 128;
MemoryUsage {
total_bytes: arena_usage + index_usage,
heap_bytes: index_usage,
index_bytes: index_usage,
arena_bytes: arena_usage,
}
}
pub fn remove_quad(&self, quad: &Quad) -> bool {
let matching_quads = self.find_quads(
Some(quad.subject()),
Some(quad.predicate()),
Some(quad.object()),
Some(quad.graph_name()),
);
for existing_quad in matching_quads {
if existing_quad == *quad {
for entry in self.quad_storage.iter() {
if entry.value() == quad {
let id = *entry.key();
self.quad_storage.remove(&id);
return true;
}
}
}
}
false
}
pub fn clear(&self) {
self.quad_storage.clear();
self.subject_index.clear();
self.predicate_index.clear();
self.object_index.clear();
self.graph_index.clear();
self.next_id.store(0, Ordering::Relaxed);
}
pub fn clear_arena(&self) {
if let Ok(mut arena) = self.arena.lock() {
arena.reset();
}
}
}
#[derive(Debug, Default)]
pub struct IndexStats {
pub insertions: AtomicUsize,
pub deletions: AtomicUsize,
pub lookups: AtomicUsize,
pub cache_hits: AtomicUsize,
pub cache_misses: AtomicUsize,
pub simd_operations: AtomicUsize,
}
impl Clone for IndexStats {
fn clone(&self) -> Self {
IndexStats {
insertions: AtomicUsize::new(self.insertions.load(Ordering::Relaxed)),
deletions: AtomicUsize::new(self.deletions.load(Ordering::Relaxed)),
lookups: AtomicUsize::new(self.lookups.load(Ordering::Relaxed)),
cache_hits: AtomicUsize::new(self.cache_hits.load(Ordering::Relaxed)),
cache_misses: AtomicUsize::new(self.cache_misses.load(Ordering::Relaxed)),
simd_operations: AtomicUsize::new(self.simd_operations.load(Ordering::Relaxed)),
}
}
}
impl IndexStats {
pub fn new() -> Self {
Default::default()
}
pub fn hit_ratio(&self) -> f64 {
let hits = self.cache_hits.load(Ordering::Relaxed);
let total = hits + self.cache_misses.load(Ordering::Relaxed);
if total == 0 {
0.0
} else {
hits as f64 / total as f64
}
}
pub fn operations_per_second(&self, duration_secs: f64) -> f64 {
let total_ops = self.insertions.load(Ordering::Relaxed)
+ self.deletions.load(Ordering::Relaxed)
+ self.lookups.load(Ordering::Relaxed);
total_ops as f64 / duration_secs
}
}
impl QueryPattern {
pub fn from_query(
subject: Option<&Subject>,
predicate: Option<&Predicate>,
object: Option<&Object>,
graph_name: Option<&GraphName>,
) -> Self {
match (
subject.is_some(),
predicate.is_some(),
object.is_some(),
graph_name.is_some(),
) {
(false, false, false, false) => QueryPattern::FullScan,
(true, false, false, false) => QueryPattern::SubjectOnly,
(false, true, false, false) => QueryPattern::PredicateOnly,
(false, false, true, false) => QueryPattern::ObjectOnly,
(false, false, false, true) => QueryPattern::GraphOnly,
(true, true, false, _) => QueryPattern::SubjectPredicate,
(true, false, true, _) => QueryPattern::SubjectObject,
(false, true, true, _) => QueryPattern::PredicateObject,
(true, true, true, false) => QueryPattern::SubjectPredicateObject,
(true, true, true, true) => QueryPattern::FullMatch,
_ => QueryPattern::FullScan, }
}
pub fn estimated_selectivity(&self) -> f64 {
match self {
QueryPattern::FullMatch => 0.0001,
QueryPattern::SubjectPredicateObject => 0.001,
QueryPattern::SubjectPredicate => 0.01,
QueryPattern::SubjectObject => 0.05,
QueryPattern::PredicateObject => 0.1,
QueryPattern::SubjectOnly => 0.2,
QueryPattern::PredicateOnly => 0.3,
QueryPattern::ObjectOnly => 0.4,
QueryPattern::GraphOnly => 0.5,
QueryPattern::FullScan => 1.0,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum IndexType {
BTree,
Hash,
Composite,
FullText,
}
#[derive(Debug, Clone)]
pub struct IndexConfig {
pub auto_create_indexes: bool,
pub max_indexes: usize,
pub min_query_frequency: usize,
pub collect_stats: bool,
pub memory_budget: usize,
}
impl Default for IndexConfig {
fn default() -> Self {
IndexConfig {
auto_create_indexes: true,
max_indexes: 20,
min_query_frequency: 10,
collect_stats: true,
memory_budget: 100 * 1024 * 1024, }
}
}
#[derive(Debug)]
pub struct QuadIndex {
index_type: IndexType,
btree_indexes: BTreeMap<IndexKey, BTreeSet<Quad>>,
hash_indexes: HashMap<IndexKey, BTreeSet<Quad>>,
stats: IndexStats,
last_access: std::time::Instant,
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum IndexKey {
Single(Term),
Composite(Vec<Term>),
Text(String),
}
impl QuadIndex {
pub fn new(index_type: IndexType) -> Self {
QuadIndex {
index_type,
btree_indexes: BTreeMap::new(),
hash_indexes: HashMap::new(),
stats: IndexStats::default(),
last_access: std::time::Instant::now(),
}
}
pub fn insert(&mut self, quad: &Quad, key: IndexKey) {
match self.index_type {
IndexType::BTree | IndexType::Composite => {
self.btree_indexes
.entry(key)
.or_default()
.insert(quad.clone());
}
IndexType::Hash => {
self.hash_indexes
.entry(key)
.or_default()
.insert(quad.clone());
}
IndexType::FullText => {
self.btree_indexes
.entry(key)
.or_default()
.insert(quad.clone());
}
}
}
pub fn remove(&mut self, quad: &Quad, key: &IndexKey) {
match self.index_type {
IndexType::BTree | IndexType::Composite | IndexType::FullText => {
if let Some(quads) = self.btree_indexes.get_mut(key) {
quads.remove(quad);
if quads.is_empty() {
self.btree_indexes.remove(key);
}
}
}
IndexType::Hash => {
if let Some(quads) = self.hash_indexes.get_mut(key) {
quads.remove(quad);
if quads.is_empty() {
self.hash_indexes.remove(key);
}
}
}
}
}
pub fn query(&mut self, key: &IndexKey) -> Option<&BTreeSet<Quad>> {
self.last_access = std::time::Instant::now();
match self.index_type {
IndexType::BTree | IndexType::Composite | IndexType::FullText => {
self.btree_indexes.get(key)
}
IndexType::Hash => self.hash_indexes.get(key),
}
}
pub fn memory_usage(&self) -> usize {
let btree_size = self.btree_indexes.len() * 64; let hash_size = self.hash_indexes.len() * 32; btree_size + hash_size
}
pub fn stats(&self) -> &IndexStats {
&self.stats
}
}
#[derive(Debug)]
pub struct IndexManager {
config: IndexConfig,
indexes: HashMap<String, QuadIndex>,
global_stats: Arc<RwLock<IndexStats>>,
query_frequency: HashMap<QueryPattern, usize>,
}
impl IndexManager {
pub fn new(config: IndexConfig) -> Self {
IndexManager {
config,
indexes: HashMap::new(),
global_stats: Arc::new(RwLock::new(IndexStats::default())),
query_frequency: HashMap::new(),
}
}
pub fn create_subject_index(&mut self) {
let index = QuadIndex::new(IndexType::BTree);
self.indexes.insert("subject".to_string(), index);
}
pub fn create_predicate_index(&mut self) {
let index = QuadIndex::new(IndexType::Hash);
self.indexes.insert("predicate".to_string(), index);
}
pub fn create_object_index(&mut self) {
let index = QuadIndex::new(IndexType::BTree);
self.indexes.insert("object".to_string(), index);
}
pub fn create_subject_predicate_index(&mut self) {
let index = QuadIndex::new(IndexType::Composite);
self.indexes.insert("subject_predicate".to_string(), index);
}
pub fn create_fulltext_index(&mut self) {
let index = QuadIndex::new(IndexType::FullText);
self.indexes.insert("fulltext".to_string(), index);
}
pub fn insert_quad(&mut self, quad: &Quad) {
if let Some(index) = self.indexes.get_mut("subject") {
let key = IndexKey::Single(Term::from_subject(quad.subject()));
index.insert(quad, key);
}
if let Some(index) = self.indexes.get_mut("predicate") {
let key = IndexKey::Single(Term::from_predicate(quad.predicate()));
index.insert(quad, key);
}
if let Some(index) = self.indexes.get_mut("object") {
let key = IndexKey::Single(Term::from_object(quad.object()));
index.insert(quad, key);
}
if let Some(index) = self.indexes.get_mut("subject_predicate") {
let key = IndexKey::Composite(vec![
Term::from_subject(quad.subject()),
Term::from_predicate(quad.predicate()),
]);
index.insert(quad, key);
}
if let Object::Literal(literal) = quad.object() {
if let Some(index) = self.indexes.get_mut("fulltext") {
let key = IndexKey::Text(literal.value().to_string());
index.insert(quad, key);
}
}
}
pub fn remove_quad(&mut self, quad: &Quad) {
if let Some(index) = self.indexes.get_mut("subject") {
let key = IndexKey::Single(Term::from_subject(quad.subject()));
index.remove(quad, &key);
}
if let Some(index) = self.indexes.get_mut("predicate") {
let key = IndexKey::Single(Term::from_predicate(quad.predicate()));
index.remove(quad, &key);
}
if let Some(index) = self.indexes.get_mut("object") {
let key = IndexKey::Single(Term::from_object(quad.object()));
index.remove(quad, &key);
}
if let Some(index) = self.indexes.get_mut("subject_predicate") {
let key = IndexKey::Composite(vec![
Term::from_subject(quad.subject()),
Term::from_predicate(quad.predicate()),
]);
index.remove(quad, &key);
}
if let Object::Literal(literal) = quad.object() {
if let Some(index) = self.indexes.get_mut("fulltext") {
let key = IndexKey::Text(literal.value().to_string());
index.remove(quad, &key);
}
}
}
pub fn query_optimized(
&mut self,
subject: Option<&Subject>,
predicate: Option<&Predicate>,
object: Option<&Object>,
graph_name: Option<&GraphName>,
) -> Option<Vec<Quad>> {
let pattern = QueryPattern::from_query(subject, predicate, object, graph_name);
if self.config.collect_stats {
*self.query_frequency.entry(pattern.clone()).or_insert(0) += 1;
}
if self.config.auto_create_indexes {
self.consider_auto_index_creation(&pattern);
}
let start_time = std::time::Instant::now();
let result = self.execute_query_with_indexes(subject, predicate, object, graph_name);
let _execution_time = start_time.elapsed().as_micros() as u64;
if self.config.collect_stats {
let stats = self.global_stats.read();
stats.lookups.fetch_add(1, Ordering::Relaxed);
if result.is_some() {
stats.cache_hits.fetch_add(1, Ordering::Relaxed);
} else {
stats.cache_misses.fetch_add(1, Ordering::Relaxed);
}
}
result
}
fn execute_query_with_indexes(
&mut self,
subject: Option<&Subject>,
predicate: Option<&Predicate>,
object: Option<&Object>,
_graph_name: Option<&GraphName>,
) -> Option<Vec<Quad>> {
if let (Some(s), Some(p)) = (subject, predicate) {
if let Some(index) = self.indexes.get_mut("subject_predicate") {
let key = IndexKey::Composite(vec![Term::from_subject(s), Term::from_predicate(p)]);
if let Some(quads) = index.query(&key) {
let mut result: Vec<Quad> = quads.iter().cloned().collect();
if let Some(o) = object {
result.retain(|quad| quad.object() == o);
}
return Some(result);
}
}
}
if let Some(s) = subject {
if let Some(index) = self.indexes.get_mut("subject") {
let key = IndexKey::Single(Term::from_subject(s));
if let Some(quads) = index.query(&key) {
let mut result: Vec<Quad> = quads.iter().cloned().collect();
if let Some(p) = predicate {
result.retain(|quad| quad.predicate() == p);
}
if let Some(o) = object {
result.retain(|quad| quad.object() == o);
}
return Some(result);
}
}
}
if let Some(p) = predicate {
if let Some(index) = self.indexes.get_mut("predicate") {
let key = IndexKey::Single(Term::from_predicate(p));
if let Some(quads) = index.query(&key) {
let mut result: Vec<Quad> = quads.iter().cloned().collect();
if let Some(s) = subject {
result.retain(|quad| quad.subject() == s);
}
if let Some(o) = object {
result.retain(|quad| quad.object() == o);
}
return Some(result);
}
}
}
if let Some(o) = object {
if let Some(index) = self.indexes.get_mut("object") {
let key = IndexKey::Single(Term::from_object(o));
if let Some(quads) = index.query(&key) {
let mut result: Vec<Quad> = quads.iter().cloned().collect();
if let Some(s) = subject {
result.retain(|quad| quad.subject() == s);
}
if let Some(p) = predicate {
result.retain(|quad| quad.predicate() == p);
}
return Some(result);
}
}
}
None
}
fn consider_auto_index_creation(&mut self, pattern: &QueryPattern) {
let frequency = self.query_frequency.get(pattern).copied().unwrap_or(0);
if frequency >= self.config.min_query_frequency {
match pattern {
QueryPattern::SubjectOnly if !self.indexes.contains_key("subject") => {
self.create_subject_index();
}
QueryPattern::PredicateOnly if !self.indexes.contains_key("predicate") => {
self.create_predicate_index();
}
QueryPattern::ObjectOnly if !self.indexes.contains_key("object") => {
self.create_object_index();
}
QueryPattern::SubjectPredicate
if !self.indexes.contains_key("subject_predicate") =>
{
self.create_subject_predicate_index();
}
_ => {} }
}
}
pub fn stats(&self) -> IndexStats {
(*self.global_stats.read()).clone()
}
pub fn total_memory_usage(&self) -> usize {
self.indexes.values().map(|idx| idx.memory_usage()).sum()
}
pub fn maintenance(&mut self) {
if self.total_memory_usage() > self.config.memory_budget {
self.evict_least_used_indexes();
}
self.update_selectivity_stats();
}
fn evict_least_used_indexes(&mut self) {
let mut indexes_by_access: Vec<_> = self
.indexes
.iter()
.map(|(name, index)| (name.clone(), index.last_access))
.collect();
indexes_by_access.sort_by_key(|(_, access_time)| *access_time);
while self.total_memory_usage() > self.config.memory_budget && !indexes_by_access.is_empty()
{
if let Some((name, _)) = indexes_by_access.pop() {
self.indexes.remove(&name);
}
}
}
fn update_selectivity_stats(&mut self) {
}
}
impl Default for IndexManager {
fn default() -> Self {
let mut manager = Self::new(IndexConfig::default());
manager.create_subject_index();
manager.create_predicate_index();
manager.create_object_index();
manager
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::Ordering;
fn create_test_quad() -> Quad {
let subject = NamedNode::new("http://example.org/subject").expect("valid IRI");
let predicate = NamedNode::new("http://example.org/predicate").expect("valid IRI");
let object = Literal::new("test object");
let graph = NamedNode::new("http://example.org/graph").expect("valid IRI");
Quad::new(subject, predicate, object, graph)
}
#[test]
fn test_query_pattern_classification() {
let subject =
Subject::NamedNode(NamedNode::new("http://example.org/s").expect("valid IRI"));
let predicate =
Predicate::NamedNode(NamedNode::new("http://example.org/p").expect("valid IRI"));
let object = Object::Literal(Literal::new("o"));
let graph =
GraphName::NamedNode(NamedNode::new("http://example.org/g").expect("valid IRI"));
assert_eq!(
QueryPattern::from_query(None, None, None, None),
QueryPattern::FullScan
);
assert_eq!(
QueryPattern::from_query(Some(&subject), None, None, None),
QueryPattern::SubjectOnly
);
assert_eq!(
QueryPattern::from_query(Some(&subject), Some(&predicate), None, None),
QueryPattern::SubjectPredicate
);
assert_eq!(
QueryPattern::from_query(
Some(&subject),
Some(&predicate),
Some(&object),
Some(&graph)
),
QueryPattern::FullMatch
);
}
#[test]
fn test_selectivity_estimates() {
assert!(
QueryPattern::FullMatch.estimated_selectivity()
< QueryPattern::SubjectPredicate.estimated_selectivity()
);
assert!(
QueryPattern::SubjectPredicate.estimated_selectivity()
< QueryPattern::SubjectOnly.estimated_selectivity()
);
assert!(
QueryPattern::SubjectOnly.estimated_selectivity()
< QueryPattern::FullScan.estimated_selectivity()
);
}
#[test]
fn test_quad_index_operations() {
let mut index = QuadIndex::new(IndexType::BTree);
let quad = create_test_quad();
let key = IndexKey::Single(Term::from_subject(quad.subject()));
index.insert(&quad, key.clone());
assert_eq!(index.query(&key).expect("query should succeed").len(), 1);
index.remove(&quad, &key);
assert!(
index.query(&key).is_none()
|| index.query(&key).expect("query should succeed").is_empty()
);
}
#[test]
fn test_index_manager_creation() {
let manager = IndexManager::default();
assert!(manager.indexes.contains_key("subject"));
assert!(manager.indexes.contains_key("predicate"));
assert!(manager.indexes.contains_key("object"));
}
#[test]
fn test_index_manager_quad_operations() {
let mut manager = IndexManager::default();
let quad = create_test_quad();
manager.insert_quad(&quad);
let subject = quad.subject();
let results = manager.query_optimized(Some(subject), None, None, None);
assert!(results.is_some());
assert_eq!(results.expect("query results should be available").len(), 1);
}
#[test]
fn test_composite_index_creation() {
let mut manager = IndexManager::default();
manager.create_subject_predicate_index();
assert!(manager.indexes.contains_key("subject_predicate"));
}
#[test]
fn test_auto_index_creation() {
let config = IndexConfig {
min_query_frequency: 2,
..Default::default()
};
let mut manager = IndexManager::new(config);
let subject =
Subject::NamedNode(NamedNode::new("http://example.org/s").expect("valid IRI"));
for _ in 0..3 {
manager.query_optimized(Some(&subject), None, None, None);
}
assert!(manager.indexes.contains_key("subject"));
}
#[test]
fn test_index_memory_usage() {
let mut manager = IndexManager::default();
let initial_memory = manager.total_memory_usage();
for i in 0..100 {
let subject = NamedNode::new(format!("http://example.org/subject{i}"))
.expect("valid IRI from format");
let predicate = NamedNode::new("http://example.org/predicate").expect("valid IRI");
let object = Literal::new(format!("object{i}"));
let quad = Quad::new(
subject,
predicate,
object,
NamedNode::new("http://example.org/graph").expect("valid IRI"),
);
manager.insert_quad(&quad);
}
let final_memory = manager.total_memory_usage();
assert!(final_memory > initial_memory);
}
#[test]
fn test_statistics_collection() {
let mut manager = IndexManager::default();
let subject =
Subject::NamedNode(NamedNode::new("http://example.org/s").expect("valid IRI"));
manager.query_optimized(Some(&subject), None, None, None);
manager.query_optimized(None, None, None, None);
let stats = manager.stats();
assert_eq!(stats.lookups.load(Ordering::Relaxed), 2);
}
}