use crate::{StarError, StarResult, StarTerm, StarTriple};
use scirs2_core::memory_efficient::{create_mmap, AccessMode, MemoryMappedArray};
use scirs2_core::ndarray_ext::Array1;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
pub struct MemoryEfficientStore {
base_path: PathBuf,
triple_data: Option<MemoryMappedArray<u8>>,
index: ChunkedIndex,
stats: StoreStatistics,
config: StoreConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StoreConfig {
pub chunk_size: usize,
pub max_memory: usize,
pub enable_compression: bool,
pub access_mode: MappedAccessMode,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum MappedAccessMode {
ReadOnly,
ReadWrite,
CopyOnWrite,
}
impl Default for StoreConfig {
fn default() -> Self {
Self {
chunk_size: 10000,
max_memory: 1 << 30, enable_compression: true,
access_mode: MappedAccessMode::ReadWrite,
}
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct StoreStatistics {
pub total_triples: usize,
pub memory_usage: usize,
pub disk_usage: usize,
pub cache_hits: usize,
pub cache_misses: usize,
}
struct ChunkedIndex {
subject_chunks: Vec<HashMap<String, Vec<usize>>>,
predicate_chunks: Vec<HashMap<String, Vec<usize>>>,
object_chunks: Vec<HashMap<String, Vec<usize>>>,
}
impl ChunkedIndex {
fn new() -> Self {
Self {
subject_chunks: Vec::new(),
predicate_chunks: Vec::new(),
object_chunks: Vec::new(),
}
}
fn add_chunk(&mut self) {
self.subject_chunks.push(HashMap::new());
self.predicate_chunks.push(HashMap::new());
self.object_chunks.push(HashMap::new());
}
}
impl MemoryEfficientStore {
pub fn new<P: AsRef<Path>>(path: P) -> StarResult<Self> {
Self::with_config(path, StoreConfig::default())
}
pub fn with_config<P: AsRef<Path>>(path: P, config: StoreConfig) -> StarResult<Self> {
let base_path = path.as_ref().to_path_buf();
if !base_path.exists() {
std::fs::create_dir_all(&base_path).map_err(|e| {
StarError::resource_error(format!("Failed to create directory: {}", e))
})?;
}
Ok(Self {
base_path,
triple_data: None,
index: ChunkedIndex::new(),
stats: StoreStatistics::default(),
config,
})
}
pub fn insert_batch(&mut self, triples: &[StarTriple]) -> StarResult<()> {
let serialized = self.serialize_triples(triples)?;
let data_path = self.base_path.join("triples.bin");
if self.triple_data.is_none() {
let array = Array1::from_vec(serialized.clone());
let mmap = create_mmap(&array, &data_path, AccessMode::ReadWrite, 0).map_err(|e| {
StarError::resource_error(format!("Failed to create memory map: {}", e))
})?;
self.triple_data = Some(mmap);
} else {
self.append_data(&serialized)?;
}
self.update_indices_chunked(triples)?;
self.stats.total_triples += triples.len();
self.stats.disk_usage += serialized.len();
Ok(())
}
pub fn query_by_subject_chunked<F>(
&self,
subject: &StarTerm,
mut processor: F,
) -> StarResult<()>
where
F: FnMut(&StarTriple) -> StarResult<()>,
{
let subject_key = self.term_to_key(subject);
for (chunk_id, chunk_index) in self.index.subject_chunks.iter().enumerate() {
if let Some(indices) = chunk_index.get(&subject_key) {
for &idx in indices {
let triple = self.load_triple_from_chunk(chunk_id, idx)?;
processor(&triple)?;
}
}
}
Ok(())
}
pub fn process_all_chunks<F>(&self, mut processor: F) -> StarResult<()>
where
F: FnMut(&[StarTriple]) -> StarResult<()>,
{
let total_chunks = self.index.subject_chunks.len();
for chunk_id in 0..total_chunks {
let triples = self.load_chunk(chunk_id)?;
processor(&triples)?;
}
Ok(())
}
pub fn optimize(&mut self) -> StarResult<()> {
self.compact_storage()?;
self.rebuild_indices()?;
self.update_statistics()?;
Ok(())
}
pub fn statistics(&self) -> &StoreStatistics {
&self.stats
}
fn serialize_triples(&self, triples: &[StarTriple]) -> StarResult<Vec<u8>> {
oxicode::serde::encode_to_vec(&triples, oxicode::config::standard())
.map_err(|e| StarError::serialization_error(format!("Serialization failed: {}", e)))
}
#[allow(dead_code)]
fn deserialize_triples(&self, data: &[u8]) -> StarResult<Vec<StarTriple>> {
oxicode::serde::decode_from_slice(data, oxicode::config::standard())
.map(|(triples, _)| triples)
.map_err(|e| StarError::parse_error(format!("Deserialization failed: {}", e)))
}
fn append_data(&mut self, data: &[u8]) -> StarResult<()> {
let mut current_data = Vec::new();
current_data.extend_from_slice(data);
let data_path = self.base_path.join("triples.bin");
let array = Array1::from_vec(current_data);
let mmap = create_mmap(&array, &data_path, AccessMode::ReadWrite, 0).map_err(|e| {
StarError::resource_error(format!("Failed to recreate memory map: {}", e))
})?;
self.triple_data = Some(mmap);
Ok(())
}
fn update_indices_chunked(&mut self, triples: &[StarTriple]) -> StarResult<()> {
let chunk_id = self.index.subject_chunks.len();
if chunk_id == 0 || triples.len() >= self.config.chunk_size {
self.index.add_chunk();
}
let current_chunk_id = self.index.subject_chunks.len() - 1;
let keys: Vec<(String, String, String)> = triples
.iter()
.map(|triple| {
(
Self::term_to_key_static(&triple.subject),
Self::term_to_key_static(&triple.predicate),
Self::term_to_key_static(&triple.object),
)
})
.collect();
let subject_chunk = &mut self.index.subject_chunks[current_chunk_id];
let predicate_chunk = &mut self.index.predicate_chunks[current_chunk_id];
let object_chunk = &mut self.index.object_chunks[current_chunk_id];
for (idx, (subject_key, predicate_key, object_key)) in keys.into_iter().enumerate() {
subject_chunk.entry(subject_key).or_default().push(idx);
predicate_chunk.entry(predicate_key).or_default().push(idx);
object_chunk.entry(object_key).or_default().push(idx);
}
Ok(())
}
fn term_to_key(&self, term: &StarTerm) -> String {
Self::term_to_key_static(term)
}
fn term_to_key_static(term: &StarTerm) -> String {
format!("{:?}", term)
}
fn load_triple_from_chunk(&self, _chunk_id: usize, _idx: usize) -> StarResult<StarTriple> {
Err(StarError::processing_error("Not implemented"))
}
fn load_chunk(&self, _chunk_id: usize) -> StarResult<Vec<StarTriple>> {
Ok(Vec::new())
}
fn compact_storage(&mut self) -> StarResult<()> {
Ok(())
}
fn rebuild_indices(&mut self) -> StarResult<()> {
self.index = ChunkedIndex::new();
Ok(())
}
fn update_statistics(&mut self) -> StarResult<()> {
if let Some(ref mmap) = self.triple_data {
self.stats.memory_usage = mmap.size;
}
Ok(())
}
}
pub struct ChunkedIterator<'a> {
store: &'a MemoryEfficientStore,
current_chunk: usize,
chunk_offset: usize,
total_chunks: usize,
}
impl<'a> ChunkedIterator<'a> {
fn new(store: &'a MemoryEfficientStore) -> Self {
let total_chunks = store.index.subject_chunks.len();
Self {
store,
current_chunk: 0,
chunk_offset: 0,
total_chunks,
}
}
}
impl<'a> Iterator for ChunkedIterator<'a> {
type Item = StarResult<StarTriple>;
fn next(&mut self) -> Option<Self::Item> {
if self.current_chunk >= self.total_chunks {
return None;
}
match self
.store
.load_triple_from_chunk(self.current_chunk, self.chunk_offset)
{
Ok(triple) => {
self.chunk_offset += 1;
Some(Ok(triple))
}
Err(_) => {
self.current_chunk += 1;
self.chunk_offset = 0;
self.next()
}
}
}
}
impl MemoryEfficientStore {
pub fn iter_chunked(&self) -> ChunkedIterator<'_> {
ChunkedIterator::new(self)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::env::temp_dir;
#[test]
fn test_store_creation() {
let temp_path = temp_dir().join("oxirs_star_test_store");
let store = MemoryEfficientStore::new(&temp_path);
assert!(store.is_ok());
let _ = std::fs::remove_dir_all(&temp_path);
}
#[test]
fn test_config_default() {
let config = StoreConfig::default();
assert_eq!(config.chunk_size, 10000);
assert!(config.enable_compression);
}
#[test]
fn test_statistics() {
let temp_path = temp_dir().join("oxirs_star_test_stats");
let store = MemoryEfficientStore::new(&temp_path).unwrap();
let stats = store.statistics();
assert_eq!(stats.total_triples, 0);
let _ = std::fs::remove_dir_all(&temp_path);
}
}