pub mod compression;
pub mod immutable;
pub mod mmap_storage;
pub mod mvcc;
pub mod temporal;
#[cfg(feature = "rocksdb")]
pub mod tiered;
pub mod virtualization;
pub use mvcc::{IsolationLevel, MvccConfig, MvccStore, TransactionId as MvccTransactionId};
use parking_lot::RwLock;
use crate::OxirsError;
use std::path::Path;
use std::sync::Arc;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct StorageConfig {
pub enable_tiering: bool,
pub enable_columnar: bool,
pub enable_temporal: bool,
pub compression: CompressionType,
pub tiers: TierConfig,
pub cache_size_mb: usize,
}
impl Default for StorageConfig {
fn default() -> Self {
StorageConfig {
enable_tiering: true,
enable_columnar: true,
enable_temporal: true,
compression: CompressionType::Zstd { level: 3 },
tiers: TierConfig::default(),
cache_size_mb: 1024,
}
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum CompressionType {
None,
Lz4 { level: u32 },
Zstd { level: i32 },
RdfCustom { dictionary_size: usize },
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct TierConfig {
pub hot_tier: HotTierConfig,
pub warm_tier: WarmTierConfig,
pub cold_tier: ColdTierConfig,
pub archive_tier: ArchiveTierConfig,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct HotTierConfig {
pub max_size_mb: usize,
pub eviction_policy: EvictionPolicy,
pub ttl_seconds: Option<u64>,
}
impl Default for HotTierConfig {
fn default() -> Self {
HotTierConfig {
max_size_mb: 4096,
eviction_policy: EvictionPolicy::Lru,
ttl_seconds: Some(3600),
}
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct WarmTierConfig {
pub path: String,
pub max_size_gb: usize,
pub promotion_threshold: u32,
pub demotion_threshold_days: u32,
}
impl Default for WarmTierConfig {
fn default() -> Self {
WarmTierConfig {
path: "/var/oxirs/warm".to_string(),
max_size_gb: 100,
promotion_threshold: 10,
demotion_threshold_days: 7,
}
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ColdTierConfig {
pub path: String,
pub max_size_tb: usize,
pub compression_level: i32,
pub archive_threshold_days: u32,
}
impl Default for ColdTierConfig {
fn default() -> Self {
ColdTierConfig {
path: "/var/oxirs/cold".to_string(),
max_size_tb: 10,
compression_level: 9,
archive_threshold_days: 90,
}
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ArchiveTierConfig {
pub backend: ArchiveBackend,
pub retention_years: Option<u32>,
pub immutable: bool,
}
impl Default for ArchiveTierConfig {
fn default() -> Self {
ArchiveTierConfig {
backend: ArchiveBackend::Local("/var/oxirs/archive".to_string()),
retention_years: Some(7),
immutable: true,
}
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum ArchiveBackend {
Local(String),
S3 { bucket: String, prefix: String },
GCS { bucket: String, prefix: String },
Azure { container: String, prefix: String },
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum EvictionPolicy {
Lru,
Lfu,
Fifo,
Adaptive,
}
#[async_trait::async_trait]
pub trait StorageEngine: Send + Sync {
async fn init(&mut self, config: StorageConfig) -> Result<(), OxirsError>;
async fn store_triple(&self, triple: &crate::model::Triple) -> Result<(), OxirsError>;
async fn store_triples(&self, triples: &[crate::model::Triple]) -> Result<(), OxirsError>;
async fn query_triples(
&self,
pattern: &crate::model::TriplePattern,
) -> Result<Vec<crate::model::Triple>, OxirsError>;
async fn delete_triples(
&self,
pattern: &crate::model::TriplePattern,
) -> Result<usize, OxirsError>;
async fn stats(&self) -> Result<StorageStats, OxirsError>;
async fn optimize(&self) -> Result<(), OxirsError>;
async fn backup(&self, path: &Path) -> Result<(), OxirsError>;
async fn restore(&self, path: &Path) -> Result<(), OxirsError>;
}
#[derive(Debug, Clone)]
pub struct StorageStats {
pub total_triples: u64,
pub total_size_bytes: u64,
pub tier_stats: TierStats,
pub compression_ratio: f64,
pub query_metrics: QueryMetrics,
}
#[derive(Debug, Clone)]
pub struct TierStats {
pub hot: TierStat,
pub warm: TierStat,
pub cold: TierStat,
pub archive: TierStat,
}
#[derive(Debug, Clone)]
pub struct TierStat {
pub triple_count: u64,
pub size_bytes: u64,
pub hit_rate: f64,
pub avg_access_time_us: u64,
}
#[derive(Debug, Clone)]
pub struct QueryMetrics {
pub avg_query_time_ms: f64,
pub p99_query_time_ms: f64,
pub qps: f64,
pub cache_hit_rate: f64,
}
pub async fn create_engine(config: StorageConfig) -> Result<Arc<dyn StorageEngine>, OxirsError> {
let engine = SimpleStorageEngine::new(config).await?;
Ok(Arc::new(engine))
}
pub struct SimpleStorageEngine {
#[allow(dead_code)]
config: StorageConfig,
mvcc_store: MvccStore,
stats: Arc<RwLock<StorageStats>>,
#[allow(dead_code)]
base_path: std::path::PathBuf,
}
impl SimpleStorageEngine {
pub async fn new(config: StorageConfig) -> Result<Self, OxirsError> {
let base_path = std::path::PathBuf::from("/tmp/oxirs_storage");
std::fs::create_dir_all(&base_path)
.map_err(|e| OxirsError::Store(format!("Failed to create storage directory: {e}")))?;
let mvcc_config = MvccConfig {
max_versions_per_triple: 100,
gc_interval: std::time::Duration::from_secs(60),
min_version_age: std::time::Duration::from_secs(30),
enable_snapshot_isolation: true,
enable_read_your_writes: true,
conflict_detection: mvcc::ConflictDetection::OptimisticTwoPhase,
};
let mvcc_store = MvccStore::new(mvcc_config);
let initial_stats = StorageStats {
total_triples: 0,
total_size_bytes: 0,
tier_stats: TierStats {
hot: TierStat {
triple_count: 0,
size_bytes: 0,
hit_rate: 0.0,
avg_access_time_us: 0,
},
warm: TierStat {
triple_count: 0,
size_bytes: 0,
hit_rate: 0.0,
avg_access_time_us: 0,
},
cold: TierStat {
triple_count: 0,
size_bytes: 0,
hit_rate: 0.0,
avg_access_time_us: 0,
},
archive: TierStat {
triple_count: 0,
size_bytes: 0,
hit_rate: 0.0,
avg_access_time_us: 0,
},
},
compression_ratio: 1.0,
query_metrics: QueryMetrics {
avg_query_time_ms: 0.0,
p99_query_time_ms: 0.0,
qps: 0.0,
cache_hit_rate: 0.0,
},
};
Ok(Self {
config,
mvcc_store,
stats: Arc::new(RwLock::new(initial_stats)),
base_path,
})
}
}
#[async_trait::async_trait]
impl StorageEngine for SimpleStorageEngine {
async fn init(&mut self, _config: StorageConfig) -> Result<(), OxirsError> {
Ok(())
}
async fn store_triple(&self, triple: &crate::model::Triple) -> Result<(), OxirsError> {
let tx_id = self
.mvcc_store
.begin_transaction(IsolationLevel::Snapshot)
.map_err(|e| OxirsError::Store(format!("Failed to begin transaction: {e}")))?;
self.mvcc_store
.insert(tx_id, triple.clone())
.map_err(|e| OxirsError::Store(format!("Failed to insert triple: {e}")))?;
self.mvcc_store
.commit_transaction(tx_id)
.map_err(|e| OxirsError::Store(format!("Failed to commit transaction: {e}")))?;
let mut stats = self.stats.write();
stats.total_triples += 1;
stats.tier_stats.hot.triple_count += 1;
Ok(())
}
async fn store_triples(&self, triples: &[crate::model::Triple]) -> Result<(), OxirsError> {
let tx_id = self
.mvcc_store
.begin_transaction(IsolationLevel::Snapshot)
.map_err(|e| OxirsError::Store(format!("Failed to begin transaction: {e}")))?;
for triple in triples {
self.mvcc_store
.insert(tx_id, triple.clone())
.map_err(|e| OxirsError::Store(format!("Failed to insert triple: {e}")))?;
}
self.mvcc_store
.commit_transaction(tx_id)
.map_err(|e| OxirsError::Store(format!("Failed to commit transaction: {e}")))?;
let mut stats = self.stats.write();
stats.total_triples += triples.len() as u64;
stats.tier_stats.hot.triple_count += triples.len() as u64;
Ok(())
}
async fn query_triples(
&self,
pattern: &crate::model::TriplePattern,
) -> Result<Vec<crate::model::Triple>, OxirsError> {
let tx_id = self
.mvcc_store
.begin_transaction(IsolationLevel::Snapshot)
.map_err(|e| OxirsError::Store(format!("Failed to begin transaction: {e}")))?;
let subject = Self::pattern_to_subject(pattern.subject());
let predicate = Self::pattern_to_predicate(pattern.predicate());
let object = Self::pattern_to_object(pattern.object());
let results = self
.mvcc_store
.query(tx_id, subject.as_ref(), predicate.as_ref(), object.as_ref())
.map_err(|e| OxirsError::Store(format!("Failed to query triples: {e}")))?;
let filtered: Vec<_> = results
.into_iter()
.filter(|triple| pattern.matches(triple))
.collect();
Ok(filtered)
}
async fn delete_triples(
&self,
pattern: &crate::model::TriplePattern,
) -> Result<usize, OxirsError> {
let tx_id = self
.mvcc_store
.begin_transaction(IsolationLevel::Snapshot)
.map_err(|e| OxirsError::Store(format!("Failed to begin transaction: {e}")))?;
let subject = Self::pattern_to_subject(pattern.subject());
let predicate = Self::pattern_to_predicate(pattern.predicate());
let object = Self::pattern_to_object(pattern.object());
let matching_triples = self
.mvcc_store
.query(tx_id, subject.as_ref(), predicate.as_ref(), object.as_ref())
.map_err(|e| OxirsError::Store(format!("Failed to query triples for deletion: {e}")))?;
let filtered: Vec<_> = matching_triples
.into_iter()
.filter(|triple| pattern.matches(triple))
.collect();
let deleted_count = filtered.len();
for triple in &filtered {
self.mvcc_store
.delete(tx_id, triple)
.map_err(|e| OxirsError::Store(format!("Failed to delete triple: {e}")))?;
}
self.mvcc_store
.commit_transaction(tx_id)
.map_err(|e| OxirsError::Store(format!("Failed to commit transaction: {e}")))?;
let mut stats = self.stats.write();
stats.total_triples = stats.total_triples.saturating_sub(deleted_count as u64);
stats.tier_stats.hot.triple_count = stats
.tier_stats
.hot
.triple_count
.saturating_sub(deleted_count as u64);
Ok(deleted_count)
}
async fn stats(&self) -> Result<StorageStats, OxirsError> {
let stats = self.stats.read();
Ok(stats.clone())
}
async fn optimize(&self) -> Result<(), OxirsError> {
self.mvcc_store
.garbage_collect()
.map_err(|e| OxirsError::Store(format!("Failed to optimize storage: {e}")))?;
Ok(())
}
async fn backup(&self, path: &Path) -> Result<(), OxirsError> {
let backup_path = path.join("oxirs_backup.json");
let all_pattern = crate::model::TriplePattern::new(None, None, None);
let triples = self.query_triples(&all_pattern).await?;
let serialized = serde_json::to_string_pretty(&triples)
.map_err(|e| OxirsError::Store(format!("Failed to serialize backup: {e}")))?;
std::fs::write(&backup_path, serialized)
.map_err(|e| OxirsError::Store(format!("Failed to write backup: {e}")))?;
Ok(())
}
async fn restore(&self, path: &Path) -> Result<(), OxirsError> {
let backup_path = path.join("oxirs_backup.json");
let serialized = std::fs::read_to_string(&backup_path)
.map_err(|e| OxirsError::Store(format!("Failed to read backup: {e}")))?;
let triples: Vec<crate::model::Triple> = serde_json::from_str(&serialized)
.map_err(|e| OxirsError::Store(format!("Failed to deserialize backup: {e}")))?;
self.store_triples(&triples).await?;
Ok(())
}
}
impl SimpleStorageEngine {
fn pattern_to_subject(
pattern: Option<&crate::model::pattern::SubjectPattern>,
) -> Option<crate::model::Subject> {
pattern?.try_into().ok()
}
fn pattern_to_predicate(
pattern: Option<&crate::model::pattern::PredicatePattern>,
) -> Option<crate::model::Predicate> {
pattern?.try_into().ok()
}
fn pattern_to_object(
pattern: Option<&crate::model::pattern::ObjectPattern>,
) -> Option<crate::model::Object> {
pattern?.try_into().ok()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_default_config() {
let config = StorageConfig::default();
assert!(config.enable_tiering);
assert!(config.enable_columnar);
assert!(config.enable_temporal);
assert_eq!(config.cache_size_mb, 1024);
}
}