use std::fs;
use std::io::{BufReader, BufWriter, Read, Write};
use std::path::PathBuf;
use std::sync::{Arc, RwLock};
use oxirs_core::rdf_store::ConcreteStore as CoreStore;
use serde::{Deserialize, Serialize};
use tracing::{debug, info, span, Level};
use scirs2_core::memory_efficient::MemoryMappedArray;
use scirs2_core::profiling::Profiler;
use crate::model::{StarTerm, StarTriple};
use crate::store::StarStore;
use crate::{StarConfig, StarError, StarResult, StarStatistics};
#[derive(Debug, Clone, Serialize, Deserialize)]
struct SerializableStarStore {
triples: Vec<StarTriple>,
config: StarConfig,
statistics: StarStatistics,
}
impl SerializableStarStore {
fn from_store(store: &StarStore) -> StarResult<Self> {
let triples = store.query(None, None, None)?;
let config = store.config().clone();
let statistics = store.statistics();
Ok(Self {
triples,
config,
statistics,
})
}
fn into_store(self) -> StarResult<StarStore> {
let store = StarStore::with_config(self.config);
for triple in self.triples {
store.insert(&triple)?;
}
Ok(store)
}
}
#[derive(Clone)]
pub enum StarStorageBackend {
Memory {
config: StarConfig,
store: Arc<RwLock<StarStore>>,
},
Persistent {
config: StarPersistenceConfig,
store: Arc<RwLock<StarStore>>,
core_store: Arc<RwLock<CoreStore>>,
},
UltraPerformance {
config: UltraPerformanceConfig,
store: Arc<RwLock<StarStore>>,
profiler: Arc<RwLock<Profiler>>,
},
MemoryMapped {
config: MemoryMappedConfig,
store: Arc<RwLock<StarStore>>,
mmap_array: Arc<RwLock<Option<MemoryMappedArray<u8>>>>,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StarPersistenceConfig {
pub path: PathBuf,
pub auto_save: bool,
pub save_interval_secs: u64,
pub compression_enabled: bool,
pub compression_algorithm: CompressionAlgorithm,
pub wal_enabled: bool,
pub max_wal_size: usize,
pub star_config: StarConfig,
}
impl Default for StarPersistenceConfig {
fn default() -> Self {
Self {
path: PathBuf::from("./rdf_star_storage"),
auto_save: true,
save_interval_secs: 60,
compression_enabled: true,
compression_algorithm: CompressionAlgorithm::Zstd,
wal_enabled: true,
max_wal_size: 64 * 1024 * 1024, star_config: StarConfig::default(),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum CompressionAlgorithm {
None,
Zstd,
Lz4,
Gzip,
}
#[derive(Clone)]
pub struct UltraPerformanceConfig {
pub star_config: StarConfig,
pub enable_simd: bool,
pub enable_parallel: bool,
pub worker_threads: usize,
pub buffer_pool_size: usize,
pub enable_profiling: bool,
}
impl Default for UltraPerformanceConfig {
fn default() -> Self {
Self {
star_config: StarConfig::default(),
enable_simd: true,
enable_parallel: true,
worker_threads: num_cpus::get(),
buffer_pool_size: 256 * 1024 * 1024, enable_profiling: false,
}
}
}
#[derive(Clone)]
pub struct MemoryMappedConfig {
pub file_path: PathBuf,
pub initial_size: usize,
pub auto_resize: bool,
pub resize_factor: f64,
pub star_config: StarConfig,
}
impl Default for MemoryMappedConfig {
fn default() -> Self {
Self {
file_path: PathBuf::from("./rdf_star.mmap"),
initial_size: 1024 * 1024 * 1024, auto_resize: true,
resize_factor: 2.0,
star_config: StarConfig::default(),
}
}
}
impl StarStorageBackend {
pub fn memory(config: StarConfig) -> StarResult<Self> {
let store = Arc::new(RwLock::new(StarStore::with_config(config.clone())));
Ok(Self::Memory { config, store })
}
pub fn persistent(config: StarPersistenceConfig) -> StarResult<Self> {
let span = span!(Level::INFO, "persistent_backend_init");
let _enter = span.enter();
info!("Initializing persistent storage at {:?}", config.path);
fs::create_dir_all(&config.path).map_err(|e| StarError::ConfigurationError {
message: format!("Failed to create storage directory: {}", e),
parameter: Some("path".to_string()),
valid_range: None,
})?;
let star_store = Arc::new(RwLock::new(StarStore::with_config(
config.star_config.clone(),
)));
let core_path = config.path.join("core_store.nq");
let core_store =
Arc::new(RwLock::new(
CoreStore::open(core_path.to_str().ok_or_else(|| {
StarError::ConfigurationError {
message: "Invalid path encoding".to_string(),
parameter: Some("path".to_string()),
valid_range: None,
}
})?)
.map_err(StarError::CoreError)?,
));
let backend = Self::Persistent {
config: config.clone(),
store: star_store,
core_store,
};
backend.load_from_disk()?;
info!("Persistent storage initialized successfully");
Ok(backend)
}
pub fn ultra_performance(config: UltraPerformanceConfig) -> StarResult<Self> {
let span = span!(Level::INFO, "ultra_performance_backend_init");
let _enter = span.enter();
info!("Initializing ultra-high-performance storage");
let star_store = Arc::new(RwLock::new(StarStore::with_config(
config.star_config.clone(),
)));
let profiler = Arc::new(RwLock::new(Profiler::new()));
info!(
"Ultra-high-performance storage initialized with {} worker threads",
config.worker_threads
);
Ok(Self::UltraPerformance {
config,
store: star_store,
profiler,
})
}
pub fn memory_mapped(config: MemoryMappedConfig) -> StarResult<Self> {
let span = span!(Level::INFO, "memory_mapped_backend_init");
let _enter = span.enter();
info!(
"Initializing memory-mapped storage at {:?}",
config.file_path
);
if let Some(parent) = config.file_path.parent() {
fs::create_dir_all(parent).map_err(|e| StarError::ConfigurationError {
message: format!("Failed to create directory: {}", e),
parameter: Some("file_path".to_string()),
valid_range: None,
})?;
}
let star_store = Arc::new(RwLock::new(StarStore::with_config(
config.star_config.clone(),
)));
let mmap_array = Arc::new(RwLock::new(None));
info!("Memory-mapped storage initialized");
Ok(Self::MemoryMapped {
config,
store: star_store,
mmap_array,
})
}
pub fn insert_quoted_triple(&mut self, triple: &StarTriple) -> StarResult<bool> {
match self {
Self::Memory { store, .. } => {
let store_guard = store.write().unwrap_or_else(|e| e.into_inner());
let initial_len = store_guard.len();
store_guard.insert(triple)?;
let final_len = store_guard.len();
Ok(final_len > initial_len)
}
Self::Persistent { store, config, .. } => {
let store_guard = store.write().unwrap_or_else(|e| e.into_inner());
let initial_len = store_guard.len();
store_guard.insert(triple)?;
let final_len = store_guard.len();
let was_inserted = final_len > initial_len;
if config.auto_save {
drop(store_guard);
self.save_to_disk()?;
}
Ok(was_inserted)
}
Self::UltraPerformance {
store,
profiler,
config: _,
} => {
let mut profiler_guard = profiler.write().unwrap_or_else(|e| e.into_inner());
profiler_guard.start();
let store_guard = store.write().unwrap_or_else(|e| e.into_inner());
let initial_len = store_guard.len();
store_guard.insert(triple)?;
let final_len = store_guard.len();
profiler_guard.stop();
Ok(final_len > initial_len)
}
Self::MemoryMapped { store, .. } => {
let store_guard = store.write().unwrap_or_else(|e| e.into_inner());
let initial_len = store_guard.len();
store_guard.insert(triple)?;
let final_len = store_guard.len();
Ok(final_len > initial_len)
}
}
}
pub fn query_quoted_triples(
&self,
subject: Option<&StarTerm>,
predicate: Option<&StarTerm>,
object: Option<&StarTerm>,
) -> StarResult<Vec<StarTriple>> {
match self {
Self::Memory { store, .. }
| Self::Persistent { store, .. }
| Self::UltraPerformance { store, .. }
| Self::MemoryMapped { store, .. } => {
let store_guard = store.read().unwrap_or_else(|e| e.into_inner());
StarStore::query(&store_guard, subject, predicate, object)
}
}
}
pub fn save_to_disk(&self) -> StarResult<()> {
match self {
Self::Persistent {
store,
core_store: _,
config,
} => {
let span = span!(Level::DEBUG, "save_to_disk");
let _enter = span.enter();
debug!("Saving RDF-star data to disk");
let quoted_triples_path = config.path.join("quoted_triples.bin");
let store_guard = store.read().unwrap_or_else(|e| e.into_inner());
let serializable = SerializableStarStore::from_store(&store_guard)?;
drop(store_guard);
let serialized =
oxicode::serde::encode_to_vec(&serializable, oxicode::config::standard())
.map_err(|e| {
StarError::serialization_error(format!(
"Failed to serialize store: {}",
e
))
})?;
let data = if config.compression_enabled {
compress_data(&serialized, config.compression_algorithm)?
} else {
serialized
};
let mut file = fs::File::create("ed_triples_path).map_err(|e| {
StarError::serialization_error(format!("Failed to create file: {}", e))
})?;
file.write_all(&data).map_err(|e| {
StarError::serialization_error(format!("Failed to write file: {}", e))
})?;
debug!("Saved {} bytes to {:?}", data.len(), quoted_triples_path);
Ok(())
}
Self::MemoryMapped {
store,
mmap_array: _,
config,
} => {
let span = span!(Level::DEBUG, "save_mmap");
let _enter = span.enter();
debug!("Syncing memory-mapped storage");
let store_guard = store.read().unwrap_or_else(|e| e.into_inner());
let serializable = SerializableStarStore::from_store(&store_guard)?;
drop(store_guard);
let serialized =
oxicode::serde::encode_to_vec(&serializable, oxicode::config::standard())
.map_err(|e| {
StarError::serialization_error(format!("Failed to serialize: {}", e))
})?;
let file = fs::OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true)
.open(&config.file_path)
.map_err(|e| StarError::ConfigurationError {
message: format!("Failed to open mmap file: {}", e),
parameter: Some("file_path".to_string()),
valid_range: None,
})?;
file.set_len(serialized.len() as u64).map_err(|e| {
StarError::serialization_error(format!("Failed to resize file: {}", e))
})?;
let mut writer = BufWriter::new(file);
writer.write_all(&serialized).map_err(|e| {
StarError::serialization_error(format!("Failed to write: {}", e))
})?;
writer.flush().map_err(|e| {
StarError::serialization_error(format!("Failed to flush: {}", e))
})?;
debug!("Synced {} bytes to memory-mapped file", serialized.len());
Ok(())
}
_ => {
Err(StarError::ConfigurationError {
message: "Backend does not support disk persistence".to_string(),
parameter: Some("backend_type".to_string()),
valid_range: Some("Persistent, MemoryMapped".to_string()),
})
}
}
}
pub fn load_from_disk(&self) -> StarResult<()> {
match self {
Self::Persistent {
store,
core_store: _,
config,
} => {
let span = span!(Level::DEBUG, "load_from_disk");
let _enter = span.enter();
debug!("Loading RDF-star data from disk");
let quoted_triples_path = config.path.join("quoted_triples.bin");
if !quoted_triples_path.exists() {
debug!("No existing data file found, starting with empty store");
return Ok(());
}
let mut file = fs::File::open("ed_triples_path).map_err(|e| {
StarError::resource_error(format!("Failed to open file: {}", e))
})?;
let mut data = Vec::new();
file.read_to_end(&mut data).map_err(|e| {
StarError::resource_error(format!("Failed to read file: {}", e))
})?;
let serialized = if config.compression_enabled {
decompress_data(&data, config.compression_algorithm)?
} else {
data
};
let (serializable, _): (SerializableStarStore, usize) =
oxicode::serde::decode_from_slice(&serialized, oxicode::config::standard())
.map_err(|e| {
StarError::resource_error(format!("Failed to deserialize store: {}", e))
})?;
let loaded_store = serializable.into_store()?;
let mut store_guard = store.write().unwrap_or_else(|e| e.into_inner());
*store_guard = loaded_store;
debug!("Loaded {} quoted triples from disk", store_guard.len());
Ok(())
}
Self::MemoryMapped { store, config, .. } => {
let span = span!(Level::DEBUG, "load_mmap");
let _enter = span.enter();
if !config.file_path.exists() {
debug!("No existing memory-mapped file, starting empty");
return Ok(());
}
let file = fs::File::open(&config.file_path).map_err(|e| {
StarError::resource_error(format!("Failed to open mmap file: {}", e))
})?;
let mut reader = BufReader::new(file);
let mut data = Vec::new();
reader
.read_to_end(&mut data)
.map_err(|e| StarError::resource_error(format!("Failed to read: {}", e)))?;
if data.is_empty() {
debug!("Empty memory-mapped file, starting empty");
return Ok(());
}
let (serializable, _): (SerializableStarStore, usize) =
oxicode::serde::decode_from_slice(&data, oxicode::config::standard()).map_err(
|e| StarError::resource_error(format!("Failed to deserialize: {}", e)),
)?;
let loaded_store = serializable.into_store()?;
let mut store_guard = store.write().unwrap_or_else(|e| e.into_inner());
*store_guard = loaded_store;
debug!(
"Loaded {} quoted triples from memory-mapped file",
store_guard.len()
);
Ok(())
}
_ => Err(StarError::ConfigurationError {
message: "Backend does not support disk loading".to_string(),
parameter: Some("backend_type".to_string()),
valid_range: Some("Persistent, MemoryMapped".to_string()),
}),
}
}
pub fn get_statistics(&self) -> StorageStatistics {
match self {
Self::Memory { store, .. }
| Self::Persistent { store, .. }
| Self::UltraPerformance { store, .. }
| Self::MemoryMapped { store, .. } => {
let store_guard = store.read().unwrap_or_else(|e| e.into_inner());
let star_stats = store_guard.statistics();
StorageStatistics {
quoted_triples_count: star_stats.quoted_triples_count,
max_nesting_depth: star_stats.max_nesting_encountered,
backend_type: self.backend_type_name(),
memory_usage_bytes: self.estimate_memory_usage(),
}
}
}
}
fn backend_type_name(&self) -> String {
match self {
Self::Memory { .. } => "Memory".to_string(),
Self::Persistent { .. } => "Persistent".to_string(),
Self::UltraPerformance { .. } => "UltraPerformance".to_string(),
Self::MemoryMapped { .. } => "MemoryMapped".to_string(),
}
}
fn estimate_memory_usage(&self) -> usize {
match self {
Self::Memory { store, .. }
| Self::Persistent { store, .. }
| Self::UltraPerformance { store, .. }
| Self::MemoryMapped { store, .. } => {
let store_guard = store.read().unwrap_or_else(|e| e.into_inner());
let stats = store_guard.statistics();
stats.quoted_triples_count * 500
}
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StorageStatistics {
pub quoted_triples_count: usize,
pub max_nesting_depth: usize,
pub backend_type: String,
pub memory_usage_bytes: usize,
}
fn compress_data(data: &[u8], algorithm: CompressionAlgorithm) -> StarResult<Vec<u8>> {
match algorithm {
CompressionAlgorithm::None => Ok(data.to_vec()),
CompressionAlgorithm::Zstd => {
let compressed = oxiarc_zstd::encode_all(data, 3).map_err(|e| {
StarError::serialization_error(format!("Zstd compression failed: {}", e))
})?;
Ok(compressed)
}
CompressionAlgorithm::Lz4 => {
let compressed = oxiarc_lz4::compress(data).map_err(|e| {
StarError::serialization_error(format!("LZ4 compression failed: {}", e))
})?;
Ok(compressed)
}
CompressionAlgorithm::Gzip => {
use flate2::write::GzEncoder;
use flate2::Compression;
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
encoder.write_all(data).map_err(|e| {
StarError::serialization_error(format!("Gzip compression failed: {}", e))
})?;
encoder
.finish()
.map_err(|e| StarError::serialization_error(format!("Gzip finish failed: {}", e)))
}
}
}
fn decompress_data(data: &[u8], algorithm: CompressionAlgorithm) -> StarResult<Vec<u8>> {
match algorithm {
CompressionAlgorithm::None => Ok(data.to_vec()),
CompressionAlgorithm::Zstd => {
let decompressed = oxiarc_zstd::decode_all(data).map_err(|e| {
StarError::resource_error(format!("Zstd decompression failed: {}", e))
})?;
Ok(decompressed)
}
CompressionAlgorithm::Lz4 => {
let decompressed = oxiarc_lz4::decompress(data, 100 * 1024 * 1024).map_err(|e| {
StarError::resource_error(format!("LZ4 decompression failed: {}", e))
})?;
Ok(decompressed)
}
CompressionAlgorithm::Gzip => {
use flate2::read::GzDecoder;
let mut decoder = GzDecoder::new(data);
let mut decompressed = Vec::new();
decoder.read_to_end(&mut decompressed).map_err(|e| {
StarError::resource_error(format!("Gzip decompression failed: {}", e))
})?;
Ok(decompressed)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::model::{StarTerm, StarTriple};
#[test]
fn test_memory_backend() -> StarResult<()> {
let mut backend = StarStorageBackend::memory(StarConfig::default())?;
let triple = StarTriple::new(
StarTerm::iri("http://example.org/s")?,
StarTerm::iri("http://example.org/p")?,
StarTerm::literal("object")?,
);
assert!(backend.insert_quoted_triple(&triple)?);
assert!(!backend.insert_quoted_triple(&triple)?);
let results = backend.query_quoted_triples(None, None, None)?;
assert_eq!(results.len(), 1);
Ok(())
}
#[test]
fn test_compression() -> StarResult<()> {
let data = b"Hello, RDF-star world! This is test data for compression.";
let compressed_zstd = compress_data(data, CompressionAlgorithm::Zstd)?;
let decompressed_zstd = decompress_data(&compressed_zstd, CompressionAlgorithm::Zstd)?;
assert_eq!(data, decompressed_zstd.as_slice());
let compressed_lz4 = compress_data(data, CompressionAlgorithm::Lz4)?;
let decompressed_lz4 = decompress_data(&compressed_lz4, CompressionAlgorithm::Lz4)?;
assert_eq!(data, decompressed_lz4.as_slice());
let compressed_gzip = compress_data(data, CompressionAlgorithm::Gzip)?;
let decompressed_gzip = decompress_data(&compressed_gzip, CompressionAlgorithm::Gzip)?;
assert_eq!(data, decompressed_gzip.as_slice());
Ok(())
}
#[test]
fn test_ultra_performance_backend() -> StarResult<()> {
let config = UltraPerformanceConfig {
enable_simd: true,
enable_parallel: true,
worker_threads: 2,
..Default::default()
};
let mut backend = StarStorageBackend::ultra_performance(config)?;
let inner = StarTriple::new(
StarTerm::iri("http://example.org/inner_s")?,
StarTerm::iri("http://example.org/inner_p")?,
StarTerm::literal("inner_o")?,
);
let triple = StarTriple::new(
StarTerm::quoted_triple(inner),
StarTerm::iri("http://example.org/p")?,
StarTerm::literal("object")?,
);
assert!(backend.insert_quoted_triple(&triple)?);
let stats = backend.get_statistics();
assert_eq!(stats.quoted_triples_count, 1);
assert_eq!(stats.backend_type, "UltraPerformance");
Ok(())
}
}