1pub mod compression;
15pub mod immutable;
16pub mod mmap_storage;
17pub mod mvcc;
18pub mod temporal;
19#[cfg(feature = "rocksdb")]
20pub mod tiered;
21pub mod virtualization;
22
23pub use mvcc::{IsolationLevel, MvccConfig, MvccStore, TransactionId as MvccTransactionId};
24use parking_lot::RwLock;
25
26use crate::OxirsError;
27use std::path::Path;
28use std::sync::Arc;
29
30#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
32pub struct StorageConfig {
33 pub enable_tiering: bool,
35 pub enable_columnar: bool,
37 pub enable_temporal: bool,
39 pub compression: CompressionType,
41 pub tiers: TierConfig,
43 pub cache_size_mb: usize,
45}
46
47impl Default for StorageConfig {
48 fn default() -> Self {
49 StorageConfig {
50 enable_tiering: true,
51 enable_columnar: true,
52 enable_temporal: true,
53 compression: CompressionType::Zstd { level: 3 },
54 tiers: TierConfig::default(),
55 cache_size_mb: 1024,
56 }
57 }
58}
59
60#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
62pub enum CompressionType {
63 None,
64 Lz4 { level: u32 },
65 Zstd { level: i32 },
66 RdfCustom { dictionary_size: usize },
67}
68
69#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
71pub struct TierConfig {
72 pub hot_tier: HotTierConfig,
74 pub warm_tier: WarmTierConfig,
76 pub cold_tier: ColdTierConfig,
78 pub archive_tier: ArchiveTierConfig,
80}
81
82#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
84pub struct HotTierConfig {
85 pub max_size_mb: usize,
87 pub eviction_policy: EvictionPolicy,
89 pub ttl_seconds: Option<u64>,
91}
92
93impl Default for HotTierConfig {
94 fn default() -> Self {
95 HotTierConfig {
96 max_size_mb: 4096,
97 eviction_policy: EvictionPolicy::Lru,
98 ttl_seconds: Some(3600),
99 }
100 }
101}
102
103#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
105pub struct WarmTierConfig {
106 pub path: String,
108 pub max_size_gb: usize,
110 pub promotion_threshold: u32,
112 pub demotion_threshold_days: u32,
114}
115
116impl Default for WarmTierConfig {
117 fn default() -> Self {
118 WarmTierConfig {
119 path: "/var/oxirs/warm".to_string(),
120 max_size_gb: 100,
121 promotion_threshold: 10,
122 demotion_threshold_days: 7,
123 }
124 }
125}
126
127#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
129pub struct ColdTierConfig {
130 pub path: String,
132 pub max_size_tb: usize,
134 pub compression_level: i32,
136 pub archive_threshold_days: u32,
138}
139
140impl Default for ColdTierConfig {
141 fn default() -> Self {
142 ColdTierConfig {
143 path: "/var/oxirs/cold".to_string(),
144 max_size_tb: 10,
145 compression_level: 9,
146 archive_threshold_days: 90,
147 }
148 }
149}
150
151#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
153pub struct ArchiveTierConfig {
154 pub backend: ArchiveBackend,
156 pub retention_years: Option<u32>,
158 pub immutable: bool,
160}
161
162impl Default for ArchiveTierConfig {
163 fn default() -> Self {
164 ArchiveTierConfig {
165 backend: ArchiveBackend::Local("/var/oxirs/archive".to_string()),
166 retention_years: Some(7),
167 immutable: true,
168 }
169 }
170}
171
172#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
174pub enum ArchiveBackend {
175 Local(String),
176 S3 { bucket: String, prefix: String },
177 GCS { bucket: String, prefix: String },
178 Azure { container: String, prefix: String },
179}
180
181#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
183pub enum EvictionPolicy {
184 Lru,
185 Lfu,
186 Fifo,
187 Adaptive,
188}
189
190#[async_trait::async_trait]
192pub trait StorageEngine: Send + Sync {
193 async fn init(&mut self, config: StorageConfig) -> Result<(), OxirsError>;
195
196 async fn store_triple(&self, triple: &crate::model::Triple) -> Result<(), OxirsError>;
198
199 async fn store_triples(&self, triples: &[crate::model::Triple]) -> Result<(), OxirsError>;
201
202 async fn query_triples(
204 &self,
205 pattern: &crate::model::TriplePattern,
206 ) -> Result<Vec<crate::model::Triple>, OxirsError>;
207
208 async fn delete_triples(
210 &self,
211 pattern: &crate::model::TriplePattern,
212 ) -> Result<usize, OxirsError>;
213
214 async fn stats(&self) -> Result<StorageStats, OxirsError>;
216
217 async fn optimize(&self) -> Result<(), OxirsError>;
219
220 async fn backup(&self, path: &Path) -> Result<(), OxirsError>;
222
223 async fn restore(&self, path: &Path) -> Result<(), OxirsError>;
225}
226
227#[derive(Debug, Clone)]
229pub struct StorageStats {
230 pub total_triples: u64,
232 pub total_size_bytes: u64,
234 pub tier_stats: TierStats,
236 pub compression_ratio: f64,
238 pub query_metrics: QueryMetrics,
240}
241
242#[derive(Debug, Clone)]
244pub struct TierStats {
245 pub hot: TierStat,
247 pub warm: TierStat,
249 pub cold: TierStat,
251 pub archive: TierStat,
253}
254
255#[derive(Debug, Clone)]
257pub struct TierStat {
258 pub triple_count: u64,
260 pub size_bytes: u64,
262 pub hit_rate: f64,
264 pub avg_access_time_us: u64,
266}
267
268#[derive(Debug, Clone)]
270pub struct QueryMetrics {
271 pub avg_query_time_ms: f64,
273 pub p99_query_time_ms: f64,
275 pub qps: f64,
277 pub cache_hit_rate: f64,
279}
280
281pub async fn create_engine(config: StorageConfig) -> Result<Arc<dyn StorageEngine>, OxirsError> {
283 let engine = SimpleStorageEngine::new(config).await?;
284 Ok(Arc::new(engine))
285}
286
287pub struct SimpleStorageEngine {
289 #[allow(dead_code)]
290 config: StorageConfig,
291 mvcc_store: MvccStore,
292 stats: Arc<RwLock<StorageStats>>,
293 #[allow(dead_code)]
294 base_path: std::path::PathBuf,
295}
296
297impl SimpleStorageEngine {
298 pub async fn new(config: StorageConfig) -> Result<Self, OxirsError> {
300 let base_path = std::path::PathBuf::from("/tmp/oxirs_storage");
301 std::fs::create_dir_all(&base_path)
302 .map_err(|e| OxirsError::Store(format!("Failed to create storage directory: {e}")))?;
303
304 let mvcc_config = MvccConfig {
305 max_versions_per_triple: 100,
306 gc_interval: std::time::Duration::from_secs(60),
307 min_version_age: std::time::Duration::from_secs(30),
308 enable_snapshot_isolation: true,
309 enable_read_your_writes: true,
310 conflict_detection: mvcc::ConflictDetection::OptimisticTwoPhase,
311 };
312
313 let mvcc_store = MvccStore::new(mvcc_config);
314
315 let initial_stats = StorageStats {
316 total_triples: 0,
317 total_size_bytes: 0,
318 tier_stats: TierStats {
319 hot: TierStat {
320 triple_count: 0,
321 size_bytes: 0,
322 hit_rate: 0.0,
323 avg_access_time_us: 0,
324 },
325 warm: TierStat {
326 triple_count: 0,
327 size_bytes: 0,
328 hit_rate: 0.0,
329 avg_access_time_us: 0,
330 },
331 cold: TierStat {
332 triple_count: 0,
333 size_bytes: 0,
334 hit_rate: 0.0,
335 avg_access_time_us: 0,
336 },
337 archive: TierStat {
338 triple_count: 0,
339 size_bytes: 0,
340 hit_rate: 0.0,
341 avg_access_time_us: 0,
342 },
343 },
344 compression_ratio: 1.0,
345 query_metrics: QueryMetrics {
346 avg_query_time_ms: 0.0,
347 p99_query_time_ms: 0.0,
348 qps: 0.0,
349 cache_hit_rate: 0.0,
350 },
351 };
352
353 Ok(Self {
354 config,
355 mvcc_store,
356 stats: Arc::new(RwLock::new(initial_stats)),
357 base_path,
358 })
359 }
360}
361
362#[async_trait::async_trait]
363impl StorageEngine for SimpleStorageEngine {
364 async fn init(&mut self, _config: StorageConfig) -> Result<(), OxirsError> {
365 Ok(())
367 }
368
369 async fn store_triple(&self, triple: &crate::model::Triple) -> Result<(), OxirsError> {
370 let tx_id = self
371 .mvcc_store
372 .begin_transaction(IsolationLevel::Snapshot)
373 .map_err(|e| OxirsError::Store(format!("Failed to begin transaction: {e}")))?;
374
375 self.mvcc_store
376 .insert(tx_id, triple.clone())
377 .map_err(|e| OxirsError::Store(format!("Failed to insert triple: {e}")))?;
378
379 self.mvcc_store
380 .commit_transaction(tx_id)
381 .map_err(|e| OxirsError::Store(format!("Failed to commit transaction: {e}")))?;
382
383 let mut stats = self.stats.write();
385 stats.total_triples += 1;
386 stats.tier_stats.hot.triple_count += 1;
387
388 Ok(())
389 }
390
391 async fn store_triples(&self, triples: &[crate::model::Triple]) -> Result<(), OxirsError> {
392 let tx_id = self
393 .mvcc_store
394 .begin_transaction(IsolationLevel::Snapshot)
395 .map_err(|e| OxirsError::Store(format!("Failed to begin transaction: {e}")))?;
396
397 for triple in triples {
398 self.mvcc_store
399 .insert(tx_id, triple.clone())
400 .map_err(|e| OxirsError::Store(format!("Failed to insert triple: {e}")))?;
401 }
402
403 self.mvcc_store
404 .commit_transaction(tx_id)
405 .map_err(|e| OxirsError::Store(format!("Failed to commit transaction: {e}")))?;
406
407 let mut stats = self.stats.write();
409 stats.total_triples += triples.len() as u64;
410 stats.tier_stats.hot.triple_count += triples.len() as u64;
411
412 Ok(())
413 }
414
415 async fn query_triples(
416 &self,
417 pattern: &crate::model::TriplePattern,
418 ) -> Result<Vec<crate::model::Triple>, OxirsError> {
419 let tx_id = self
420 .mvcc_store
421 .begin_transaction(IsolationLevel::Snapshot)
422 .map_err(|e| OxirsError::Store(format!("Failed to begin transaction: {e}")))?;
423
424 let subject = Self::pattern_to_subject(pattern.subject());
426 let predicate = Self::pattern_to_predicate(pattern.predicate());
427 let object = Self::pattern_to_object(pattern.object());
428
429 let results = self
430 .mvcc_store
431 .query(tx_id, subject.as_ref(), predicate.as_ref(), object.as_ref())
432 .map_err(|e| OxirsError::Store(format!("Failed to query triples: {e}")))?;
433
434 let filtered: Vec<_> = results
436 .into_iter()
437 .filter(|triple| pattern.matches(triple))
438 .collect();
439
440 Ok(filtered)
441 }
442
443 async fn delete_triples(
444 &self,
445 pattern: &crate::model::TriplePattern,
446 ) -> Result<usize, OxirsError> {
447 let tx_id = self
448 .mvcc_store
449 .begin_transaction(IsolationLevel::Snapshot)
450 .map_err(|e| OxirsError::Store(format!("Failed to begin transaction: {e}")))?;
451
452 let subject = Self::pattern_to_subject(pattern.subject());
454 let predicate = Self::pattern_to_predicate(pattern.predicate());
455 let object = Self::pattern_to_object(pattern.object());
456
457 let matching_triples = self
459 .mvcc_store
460 .query(tx_id, subject.as_ref(), predicate.as_ref(), object.as_ref())
461 .map_err(|e| OxirsError::Store(format!("Failed to query triples for deletion: {e}")))?;
462
463 let filtered: Vec<_> = matching_triples
465 .into_iter()
466 .filter(|triple| pattern.matches(triple))
467 .collect();
468
469 let deleted_count = filtered.len();
470
471 for triple in &filtered {
473 self.mvcc_store
474 .delete(tx_id, triple)
475 .map_err(|e| OxirsError::Store(format!("Failed to delete triple: {e}")))?;
476 }
477
478 self.mvcc_store
479 .commit_transaction(tx_id)
480 .map_err(|e| OxirsError::Store(format!("Failed to commit transaction: {e}")))?;
481
482 let mut stats = self.stats.write();
484 stats.total_triples = stats.total_triples.saturating_sub(deleted_count as u64);
485 stats.tier_stats.hot.triple_count = stats
486 .tier_stats
487 .hot
488 .triple_count
489 .saturating_sub(deleted_count as u64);
490
491 Ok(deleted_count)
492 }
493
494 async fn stats(&self) -> Result<StorageStats, OxirsError> {
495 let stats = self.stats.read();
496 Ok(stats.clone())
497 }
498
499 async fn optimize(&self) -> Result<(), OxirsError> {
500 self.mvcc_store
502 .garbage_collect()
503 .map_err(|e| OxirsError::Store(format!("Failed to optimize storage: {e}")))?;
504 Ok(())
505 }
506
507 async fn backup(&self, path: &Path) -> Result<(), OxirsError> {
508 let backup_path = path.join("oxirs_backup.json");
510
511 let all_pattern = crate::model::TriplePattern::new(None, None, None);
513 let triples = self.query_triples(&all_pattern).await?;
514
515 let serialized = serde_json::to_string_pretty(&triples)
516 .map_err(|e| OxirsError::Store(format!("Failed to serialize backup: {e}")))?;
517
518 std::fs::write(&backup_path, serialized)
519 .map_err(|e| OxirsError::Store(format!("Failed to write backup: {e}")))?;
520
521 Ok(())
522 }
523
524 async fn restore(&self, path: &Path) -> Result<(), OxirsError> {
525 let backup_path = path.join("oxirs_backup.json");
527
528 let serialized = std::fs::read_to_string(&backup_path)
529 .map_err(|e| OxirsError::Store(format!("Failed to read backup: {e}")))?;
530
531 let triples: Vec<crate::model::Triple> = serde_json::from_str(&serialized)
532 .map_err(|e| OxirsError::Store(format!("Failed to deserialize backup: {e}")))?;
533
534 self.store_triples(&triples).await?;
535
536 Ok(())
537 }
538}
539
540impl SimpleStorageEngine {
541 fn pattern_to_subject(
543 pattern: Option<&crate::model::pattern::SubjectPattern>,
544 ) -> Option<crate::model::Subject> {
545 pattern?.try_into().ok()
546 }
547
548 fn pattern_to_predicate(
550 pattern: Option<&crate::model::pattern::PredicatePattern>,
551 ) -> Option<crate::model::Predicate> {
552 pattern?.try_into().ok()
553 }
554
555 fn pattern_to_object(
557 pattern: Option<&crate::model::pattern::ObjectPattern>,
558 ) -> Option<crate::model::Object> {
559 pattern?.try_into().ok()
560 }
561}
562
563#[cfg(test)]
564mod tests {
565 use super::*;
566
567 #[test]
568 fn test_default_config() {
569 let config = StorageConfig::default();
570 assert!(config.enable_tiering);
571 assert!(config.enable_columnar);
572 assert!(config.enable_temporal);
573 assert_eq!(config.cache_size_mb, 1024);
574 }
575}