use crate::concurrent::lock_free_graph::ConcurrentGraph;
use crate::model::{Object, Predicate, Subject, Triple};
use crate::simd_triple_matching::SimdTripleMatcher;
use crate::OxirsError;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
pub struct OptimizedReader {
graph: Arc<ConcurrentGraph>,
#[allow(dead_code)]
simd_matcher: SimdTripleMatcher,
read_count: AtomicU64,
cache_hits: AtomicU64,
cache_misses: AtomicU64,
}
impl OptimizedReader {
pub fn new(graph: Arc<ConcurrentGraph>) -> Self {
Self {
graph,
simd_matcher: SimdTripleMatcher::new(),
read_count: AtomicU64::new(0),
cache_hits: AtomicU64::new(0),
cache_misses: AtomicU64::new(0),
}
}
pub fn bulk_read(&self, max_count: usize) -> Result<Vec<Triple>, OxirsError> {
self.read_count.fetch_add(1, Ordering::Relaxed);
let triples: Vec<Triple> = self.graph.iter().take(max_count).collect();
if triples.len() == max_count {
self.prefetch_next_chunk(max_count);
}
Ok(triples)
}
pub fn pattern_match_optimized(
&self,
subject: Option<&Subject>,
predicate: Option<&Predicate>,
object: Option<&Object>,
) -> Result<Vec<Triple>, OxirsError> {
self.read_count.fetch_add(1, Ordering::Relaxed);
let candidates = self.graph.match_pattern(subject, predicate, object);
if candidates.len() > 1000 {
self.simd_filter_large(&candidates)
} else {
Ok(candidates)
}
}
pub fn streaming_read(&self, chunk_size: usize) -> StreamingReader<'_> {
StreamingReader {
reader: self,
chunk_size,
position: 0,
current_chunk: Vec::new(),
}
}
pub fn range_read(
&self,
subject_range: (Subject, Subject),
max_count: usize,
) -> Result<Vec<Triple>, OxirsError> {
self.read_count.fetch_add(1, Ordering::Relaxed);
let (start, end) = subject_range;
let triples: Vec<Triple> = self
.graph
.iter()
.filter(|t| {
let s = t.subject();
s >= &start && s <= &end
})
.take(max_count)
.collect();
Ok(triples)
}
pub fn count_matching(
&self,
subject: Option<&Subject>,
predicate: Option<&Predicate>,
object: Option<&Object>,
) -> usize {
self.read_count.fetch_add(1, Ordering::Relaxed);
self.graph.match_pattern(subject, predicate, object).len()
}
pub fn exists(&self, triple: &Triple) -> bool {
self.read_count.fetch_add(1, Ordering::Relaxed);
self.graph.contains(triple)
}
pub fn prefetched_read(&self, positions: &[usize]) -> Result<Vec<Triple>, OxirsError> {
self.read_count.fetch_add(1, Ordering::Relaxed);
let all_triples: Vec<Triple> = self.graph.iter().collect();
let mut result = Vec::with_capacity(positions.len());
for &pos in positions {
if pos < all_triples.len() {
if let Some(&next_pos) = positions
.iter()
.position(|&p| p == pos)
.and_then(|idx| positions.get(idx + 1))
{
if next_pos < all_triples.len() {
std::hint::black_box(&all_triples[next_pos]);
}
}
result.push(all_triples[pos].clone());
}
}
Ok(result)
}
pub fn read_stats(&self) -> ReadStats {
ReadStats {
total_reads: self.read_count.load(Ordering::Relaxed),
cache_hits: self.cache_hits.load(Ordering::Relaxed),
cache_misses: self.cache_misses.load(Ordering::Relaxed),
hit_rate: self.calculate_hit_rate(),
}
}
pub fn reset_stats(&self) {
self.read_count.store(0, Ordering::Relaxed);
self.cache_hits.store(0, Ordering::Relaxed);
self.cache_misses.store(0, Ordering::Relaxed);
}
fn prefetch_next_chunk(&self, _offset: usize) {
std::hint::black_box(());
}
fn simd_filter_large(&self, triples: &[Triple]) -> Result<Vec<Triple>, OxirsError> {
Ok(triples.to_vec())
}
fn calculate_hit_rate(&self) -> f64 {
let hits = self.cache_hits.load(Ordering::Relaxed) as f64;
let misses = self.cache_misses.load(Ordering::Relaxed) as f64;
let total = hits + misses;
if total > 0.0 {
hits / total
} else {
0.0
}
}
}
pub struct StreamingReader<'a> {
reader: &'a OptimizedReader,
chunk_size: usize,
position: usize,
current_chunk: Vec<Triple>,
}
impl<'a> StreamingReader<'a> {
pub fn next_chunk(&mut self) -> Result<Vec<Triple>, OxirsError> {
let all_triples: Vec<Triple> = self
.reader
.graph
.iter()
.skip(self.position)
.take(self.chunk_size)
.collect();
self.position += all_triples.len();
self.current_chunk = all_triples.clone();
Ok(all_triples)
}
pub fn has_more(&self) -> bool {
self.position < self.reader.graph.len()
}
pub fn reset(&mut self) {
self.position = 0;
self.current_chunk.clear();
}
}
#[derive(Debug, Clone, Copy)]
pub struct ReadStats {
pub total_reads: u64,
pub cache_hits: u64,
pub cache_misses: u64,
pub hit_rate: f64,
}
impl ReadStats {
pub fn is_performant(&self) -> bool {
self.hit_rate > 0.8
}
pub fn efficiency_percent(&self) -> f64 {
self.hit_rate * 100.0
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::model::{Literal, NamedNode};
#[test]
fn test_optimized_reader_creation() {
let graph = Arc::new(ConcurrentGraph::new());
let reader = OptimizedReader::new(graph);
let stats = reader.read_stats();
assert_eq!(stats.total_reads, 0);
}
#[test]
fn test_bulk_read_empty() {
let graph = Arc::new(ConcurrentGraph::new());
let reader = OptimizedReader::new(graph);
let triples = reader.bulk_read(100).expect("operation should succeed");
assert_eq!(triples.len(), 0);
}
#[test]
fn test_read_stats() {
let graph = Arc::new(ConcurrentGraph::new());
let reader = OptimizedReader::new(graph);
let _ = reader.bulk_read(10);
let _ = reader.bulk_read(20);
let stats = reader.read_stats();
assert_eq!(stats.total_reads, 2);
}
#[test]
fn test_streaming_reader() {
let graph = Arc::new(ConcurrentGraph::new());
let reader = OptimizedReader::new(Arc::clone(&graph));
let stream = reader.streaming_read(10);
assert!(!stream.has_more());
}
#[test]
fn test_exists_operation() {
let graph = Arc::new(ConcurrentGraph::new());
let reader = OptimizedReader::new(graph);
let s = Subject::NamedNode(NamedNode::new("http://example.org/s").expect("valid IRI"));
let p = Predicate::NamedNode(NamedNode::new("http://example.org/p").expect("valid IRI"));
let o = Object::Literal(Literal::new("test"));
let triple = Triple::new(s, p, o);
assert!(!reader.exists(&triple));
}
#[test]
fn test_count_matching() {
let graph = Arc::new(ConcurrentGraph::new());
let reader = OptimizedReader::new(graph);
let count = reader.count_matching(None, None, None);
assert_eq!(count, 0);
}
}