Skip to main content

oxirs_core/storage/
mod.rs

1//! Next-Generation Storage Engine for OxiRS
2//!
3//! This module implements a quantum-ready storage architecture with:
4//! - Tiered storage with intelligent data placement
5//! - Columnar storage for analytical workloads  
6//! - Time-series optimization for temporal RDF
7//! - Immutable storage with content-addressable blocks
8//! - Advanced compression (LZ4, ZSTD, custom RDF codecs)
9//! - Storage virtualization with transparent migration
10//! - Multi-Version Concurrency Control (MVCC) for high-concurrency operations
11
12// #[cfg(feature = "columnar")]
13// pub mod columnar; // TODO: Add 'columnar' feature to Cargo.toml when ready
14pub mod compression;
15pub mod immutable;
16pub mod mmap_storage;
17pub mod mvcc;
18pub mod temporal;
19#[cfg(feature = "rocksdb")]
20pub mod tiered;
21pub mod virtualization;
22
23pub use mvcc::{IsolationLevel, MvccConfig, MvccStore, TransactionId as MvccTransactionId};
24use parking_lot::RwLock;
25
26use crate::OxirsError;
27use std::path::Path;
28use std::sync::Arc;
29
30/// Storage configuration
31#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
32pub struct StorageConfig {
33    /// Enable tiered storage
34    pub enable_tiering: bool,
35    /// Enable columnar storage for analytics
36    pub enable_columnar: bool,
37    /// Enable temporal optimization
38    pub enable_temporal: bool,
39    /// Compression algorithm
40    pub compression: CompressionType,
41    /// Storage tiers configuration
42    pub tiers: TierConfig,
43    /// Cache size in MB
44    pub cache_size_mb: usize,
45}
46
47impl Default for StorageConfig {
48    fn default() -> Self {
49        StorageConfig {
50            enable_tiering: true,
51            enable_columnar: true,
52            enable_temporal: true,
53            compression: CompressionType::Zstd { level: 3 },
54            tiers: TierConfig::default(),
55            cache_size_mb: 1024,
56        }
57    }
58}
59
60/// Compression types
61#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
62pub enum CompressionType {
63    None,
64    Lz4 { level: u32 },
65    Zstd { level: i32 },
66    RdfCustom { dictionary_size: usize },
67}
68
69/// Storage tier configuration
70#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
71pub struct TierConfig {
72    /// Hot tier: in-memory with fastest access
73    pub hot_tier: HotTierConfig,
74    /// Warm tier: SSD-optimized storage
75    pub warm_tier: WarmTierConfig,
76    /// Cold tier: HDD/object storage
77    pub cold_tier: ColdTierConfig,
78    /// Archive tier: long-term immutable storage
79    pub archive_tier: ArchiveTierConfig,
80}
81
82/// Hot tier configuration
83#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
84pub struct HotTierConfig {
85    /// Maximum size in MB
86    pub max_size_mb: usize,
87    /// Eviction policy
88    pub eviction_policy: EvictionPolicy,
89    /// Time to live in seconds
90    pub ttl_seconds: Option<u64>,
91}
92
93impl Default for HotTierConfig {
94    fn default() -> Self {
95        HotTierConfig {
96            max_size_mb: 4096,
97            eviction_policy: EvictionPolicy::Lru,
98            ttl_seconds: Some(3600),
99        }
100    }
101}
102
103/// Warm tier configuration  
104#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
105pub struct WarmTierConfig {
106    /// Path to warm storage
107    pub path: String,
108    /// Maximum size in GB
109    pub max_size_gb: usize,
110    /// Promotion threshold (access count)
111    pub promotion_threshold: u32,
112    /// Demotion threshold (days since last access)
113    pub demotion_threshold_days: u32,
114}
115
116impl Default for WarmTierConfig {
117    fn default() -> Self {
118        WarmTierConfig {
119            path: "/var/oxirs/warm".to_string(),
120            max_size_gb: 100,
121            promotion_threshold: 10,
122            demotion_threshold_days: 7,
123        }
124    }
125}
126
127/// Cold tier configuration
128#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
129pub struct ColdTierConfig {
130    /// Path to cold storage
131    pub path: String,
132    /// Maximum size in TB
133    pub max_size_tb: usize,
134    /// Compression level
135    pub compression_level: i32,
136    /// Archive threshold (days since last access)
137    pub archive_threshold_days: u32,
138}
139
140impl Default for ColdTierConfig {
141    fn default() -> Self {
142        ColdTierConfig {
143            path: "/var/oxirs/cold".to_string(),
144            max_size_tb: 10,
145            compression_level: 9,
146            archive_threshold_days: 90,
147        }
148    }
149}
150
151/// Archive tier configuration
152#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
153pub struct ArchiveTierConfig {
154    /// Archive storage backend
155    pub backend: ArchiveBackend,
156    /// Retention policy
157    pub retention_years: Option<u32>,
158    /// Immutability guarantee
159    pub immutable: bool,
160}
161
162impl Default for ArchiveTierConfig {
163    fn default() -> Self {
164        ArchiveTierConfig {
165            backend: ArchiveBackend::Local("/var/oxirs/archive".to_string()),
166            retention_years: Some(7),
167            immutable: true,
168        }
169    }
170}
171
172/// Archive storage backend
173#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
174pub enum ArchiveBackend {
175    Local(String),
176    S3 { bucket: String, prefix: String },
177    GCS { bucket: String, prefix: String },
178    Azure { container: String, prefix: String },
179}
180
181/// Eviction policy for hot tier
182#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
183pub enum EvictionPolicy {
184    Lru,
185    Lfu,
186    Fifo,
187    Adaptive,
188}
189
190/// Storage engine trait
191#[async_trait::async_trait]
192pub trait StorageEngine: Send + Sync {
193    /// Initialize the storage engine
194    async fn init(&mut self, config: StorageConfig) -> Result<(), OxirsError>;
195
196    /// Store a triple
197    async fn store_triple(&self, triple: &crate::model::Triple) -> Result<(), OxirsError>;
198
199    /// Store multiple triples
200    async fn store_triples(&self, triples: &[crate::model::Triple]) -> Result<(), OxirsError>;
201
202    /// Query triples by pattern
203    async fn query_triples(
204        &self,
205        pattern: &crate::model::TriplePattern,
206    ) -> Result<Vec<crate::model::Triple>, OxirsError>;
207
208    /// Delete triples by pattern
209    async fn delete_triples(
210        &self,
211        pattern: &crate::model::TriplePattern,
212    ) -> Result<usize, OxirsError>;
213
214    /// Get storage statistics
215    async fn stats(&self) -> Result<StorageStats, OxirsError>;
216
217    /// Optimize storage
218    async fn optimize(&self) -> Result<(), OxirsError>;
219
220    /// Backup storage
221    async fn backup(&self, path: &Path) -> Result<(), OxirsError>;
222
223    /// Restore from backup
224    async fn restore(&self, path: &Path) -> Result<(), OxirsError>;
225}
226
227/// Storage statistics
228#[derive(Debug, Clone)]
229pub struct StorageStats {
230    /// Total number of triples
231    pub total_triples: u64,
232    /// Storage size in bytes
233    pub total_size_bytes: u64,
234    /// Tier distribution
235    pub tier_stats: TierStats,
236    /// Compression ratio
237    pub compression_ratio: f64,
238    /// Query performance metrics
239    pub query_metrics: QueryMetrics,
240}
241
242/// Tier statistics
243#[derive(Debug, Clone)]
244pub struct TierStats {
245    /// Hot tier stats
246    pub hot: TierStat,
247    /// Warm tier stats
248    pub warm: TierStat,
249    /// Cold tier stats
250    pub cold: TierStat,
251    /// Archive tier stats
252    pub archive: TierStat,
253}
254
255/// Individual tier statistics
256#[derive(Debug, Clone)]
257pub struct TierStat {
258    /// Number of triples
259    pub triple_count: u64,
260    /// Size in bytes
261    pub size_bytes: u64,
262    /// Hit rate percentage
263    pub hit_rate: f64,
264    /// Average access time in microseconds
265    pub avg_access_time_us: u64,
266}
267
268/// Query performance metrics
269#[derive(Debug, Clone)]
270pub struct QueryMetrics {
271    /// Average query time in milliseconds
272    pub avg_query_time_ms: f64,
273    /// 99th percentile query time
274    pub p99_query_time_ms: f64,
275    /// Queries per second
276    pub qps: f64,
277    /// Cache hit rate
278    pub cache_hit_rate: f64,
279}
280
281/// Create a new storage engine with the given configuration
282pub async fn create_engine(config: StorageConfig) -> Result<Arc<dyn StorageEngine>, OxirsError> {
283    let engine = SimpleStorageEngine::new(config).await?;
284    Ok(Arc::new(engine))
285}
286
287/// Simple file-based storage engine implementation
288pub struct SimpleStorageEngine {
289    #[allow(dead_code)]
290    config: StorageConfig,
291    mvcc_store: MvccStore,
292    stats: Arc<RwLock<StorageStats>>,
293    #[allow(dead_code)]
294    base_path: std::path::PathBuf,
295}
296
297impl SimpleStorageEngine {
298    /// Create a new simple storage engine
299    pub async fn new(config: StorageConfig) -> Result<Self, OxirsError> {
300        let base_path = std::path::PathBuf::from("/tmp/oxirs_storage");
301        std::fs::create_dir_all(&base_path)
302            .map_err(|e| OxirsError::Store(format!("Failed to create storage directory: {e}")))?;
303
304        let mvcc_config = MvccConfig {
305            max_versions_per_triple: 100,
306            gc_interval: std::time::Duration::from_secs(60),
307            min_version_age: std::time::Duration::from_secs(30),
308            enable_snapshot_isolation: true,
309            enable_read_your_writes: true,
310            conflict_detection: mvcc::ConflictDetection::OptimisticTwoPhase,
311        };
312
313        let mvcc_store = MvccStore::new(mvcc_config);
314
315        let initial_stats = StorageStats {
316            total_triples: 0,
317            total_size_bytes: 0,
318            tier_stats: TierStats {
319                hot: TierStat {
320                    triple_count: 0,
321                    size_bytes: 0,
322                    hit_rate: 0.0,
323                    avg_access_time_us: 0,
324                },
325                warm: TierStat {
326                    triple_count: 0,
327                    size_bytes: 0,
328                    hit_rate: 0.0,
329                    avg_access_time_us: 0,
330                },
331                cold: TierStat {
332                    triple_count: 0,
333                    size_bytes: 0,
334                    hit_rate: 0.0,
335                    avg_access_time_us: 0,
336                },
337                archive: TierStat {
338                    triple_count: 0,
339                    size_bytes: 0,
340                    hit_rate: 0.0,
341                    avg_access_time_us: 0,
342                },
343            },
344            compression_ratio: 1.0,
345            query_metrics: QueryMetrics {
346                avg_query_time_ms: 0.0,
347                p99_query_time_ms: 0.0,
348                qps: 0.0,
349                cache_hit_rate: 0.0,
350            },
351        };
352
353        Ok(Self {
354            config,
355            mvcc_store,
356            stats: Arc::new(RwLock::new(initial_stats)),
357            base_path,
358        })
359    }
360}
361
362#[async_trait::async_trait]
363impl StorageEngine for SimpleStorageEngine {
364    async fn init(&mut self, _config: StorageConfig) -> Result<(), OxirsError> {
365        // Already initialized in new()
366        Ok(())
367    }
368
369    async fn store_triple(&self, triple: &crate::model::Triple) -> Result<(), OxirsError> {
370        let tx_id = self
371            .mvcc_store
372            .begin_transaction(IsolationLevel::Snapshot)
373            .map_err(|e| OxirsError::Store(format!("Failed to begin transaction: {e}")))?;
374
375        self.mvcc_store
376            .insert(tx_id, triple.clone())
377            .map_err(|e| OxirsError::Store(format!("Failed to insert triple: {e}")))?;
378
379        self.mvcc_store
380            .commit_transaction(tx_id)
381            .map_err(|e| OxirsError::Store(format!("Failed to commit transaction: {e}")))?;
382
383        // Update statistics
384        let mut stats = self.stats.write();
385        stats.total_triples += 1;
386        stats.tier_stats.hot.triple_count += 1;
387
388        Ok(())
389    }
390
391    async fn store_triples(&self, triples: &[crate::model::Triple]) -> Result<(), OxirsError> {
392        let tx_id = self
393            .mvcc_store
394            .begin_transaction(IsolationLevel::Snapshot)
395            .map_err(|e| OxirsError::Store(format!("Failed to begin transaction: {e}")))?;
396
397        for triple in triples {
398            self.mvcc_store
399                .insert(tx_id, triple.clone())
400                .map_err(|e| OxirsError::Store(format!("Failed to insert triple: {e}")))?;
401        }
402
403        self.mvcc_store
404            .commit_transaction(tx_id)
405            .map_err(|e| OxirsError::Store(format!("Failed to commit transaction: {e}")))?;
406
407        // Update statistics
408        let mut stats = self.stats.write();
409        stats.total_triples += triples.len() as u64;
410        stats.tier_stats.hot.triple_count += triples.len() as u64;
411
412        Ok(())
413    }
414
415    async fn query_triples(
416        &self,
417        pattern: &crate::model::TriplePattern,
418    ) -> Result<Vec<crate::model::Triple>, OxirsError> {
419        let tx_id = self
420            .mvcc_store
421            .begin_transaction(IsolationLevel::Snapshot)
422            .map_err(|e| OxirsError::Store(format!("Failed to begin transaction: {e}")))?;
423
424        // Convert patterns to concrete terms for MVCC query
425        let subject = Self::pattern_to_subject(pattern.subject());
426        let predicate = Self::pattern_to_predicate(pattern.predicate());
427        let object = Self::pattern_to_object(pattern.object());
428
429        let results = self
430            .mvcc_store
431            .query(tx_id, subject.as_ref(), predicate.as_ref(), object.as_ref())
432            .map_err(|e| OxirsError::Store(format!("Failed to query triples: {e}")))?;
433
434        // Filter results to match the pattern (in case of variables)
435        let filtered: Vec<_> = results
436            .into_iter()
437            .filter(|triple| pattern.matches(triple))
438            .collect();
439
440        Ok(filtered)
441    }
442
443    async fn delete_triples(
444        &self,
445        pattern: &crate::model::TriplePattern,
446    ) -> Result<usize, OxirsError> {
447        let tx_id = self
448            .mvcc_store
449            .begin_transaction(IsolationLevel::Snapshot)
450            .map_err(|e| OxirsError::Store(format!("Failed to begin transaction: {e}")))?;
451
452        // Convert patterns to concrete terms for MVCC query
453        let subject = Self::pattern_to_subject(pattern.subject());
454        let predicate = Self::pattern_to_predicate(pattern.predicate());
455        let object = Self::pattern_to_object(pattern.object());
456
457        // First query to find matching triples
458        let matching_triples = self
459            .mvcc_store
460            .query(tx_id, subject.as_ref(), predicate.as_ref(), object.as_ref())
461            .map_err(|e| OxirsError::Store(format!("Failed to query triples for deletion: {e}")))?;
462
463        // Filter results to match the pattern exactly
464        let filtered: Vec<_> = matching_triples
465            .into_iter()
466            .filter(|triple| pattern.matches(triple))
467            .collect();
468
469        let deleted_count = filtered.len();
470
471        // Delete each matching triple
472        for triple in &filtered {
473            self.mvcc_store
474                .delete(tx_id, triple)
475                .map_err(|e| OxirsError::Store(format!("Failed to delete triple: {e}")))?;
476        }
477
478        self.mvcc_store
479            .commit_transaction(tx_id)
480            .map_err(|e| OxirsError::Store(format!("Failed to commit transaction: {e}")))?;
481
482        // Update statistics
483        let mut stats = self.stats.write();
484        stats.total_triples = stats.total_triples.saturating_sub(deleted_count as u64);
485        stats.tier_stats.hot.triple_count = stats
486            .tier_stats
487            .hot
488            .triple_count
489            .saturating_sub(deleted_count as u64);
490
491        Ok(deleted_count)
492    }
493
494    async fn stats(&self) -> Result<StorageStats, OxirsError> {
495        let stats = self.stats.read();
496        Ok(stats.clone())
497    }
498
499    async fn optimize(&self) -> Result<(), OxirsError> {
500        // Run garbage collection on MVCC store
501        self.mvcc_store
502            .garbage_collect()
503            .map_err(|e| OxirsError::Store(format!("Failed to optimize storage: {e}")))?;
504        Ok(())
505    }
506
507    async fn backup(&self, path: &Path) -> Result<(), OxirsError> {
508        // Simple backup implementation - serialize current state to file
509        let backup_path = path.join("oxirs_backup.json");
510
511        // Get all triples using a query that matches everything
512        let all_pattern = crate::model::TriplePattern::new(None, None, None);
513        let triples = self.query_triples(&all_pattern).await?;
514
515        let serialized = serde_json::to_string_pretty(&triples)
516            .map_err(|e| OxirsError::Store(format!("Failed to serialize backup: {e}")))?;
517
518        std::fs::write(&backup_path, serialized)
519            .map_err(|e| OxirsError::Store(format!("Failed to write backup: {e}")))?;
520
521        Ok(())
522    }
523
524    async fn restore(&self, path: &Path) -> Result<(), OxirsError> {
525        // Simple restore implementation - deserialize from file
526        let backup_path = path.join("oxirs_backup.json");
527
528        let serialized = std::fs::read_to_string(&backup_path)
529            .map_err(|e| OxirsError::Store(format!("Failed to read backup: {e}")))?;
530
531        let triples: Vec<crate::model::Triple> = serde_json::from_str(&serialized)
532            .map_err(|e| OxirsError::Store(format!("Failed to deserialize backup: {e}")))?;
533
534        self.store_triples(&triples).await?;
535
536        Ok(())
537    }
538}
539
540impl SimpleStorageEngine {
541    /// Convert a subject pattern to a concrete subject term
542    fn pattern_to_subject(
543        pattern: Option<&crate::model::pattern::SubjectPattern>,
544    ) -> Option<crate::model::Subject> {
545        pattern?.try_into().ok()
546    }
547
548    /// Convert a predicate pattern to a concrete predicate term
549    fn pattern_to_predicate(
550        pattern: Option<&crate::model::pattern::PredicatePattern>,
551    ) -> Option<crate::model::Predicate> {
552        pattern?.try_into().ok()
553    }
554
555    /// Convert an object pattern to a concrete object term
556    fn pattern_to_object(
557        pattern: Option<&crate::model::pattern::ObjectPattern>,
558    ) -> Option<crate::model::Object> {
559        pattern?.try_into().ok()
560    }
561}
562
563#[cfg(test)]
564mod tests {
565    use super::*;
566
567    #[test]
568    fn test_default_config() {
569        let config = StorageConfig::default();
570        assert!(config.enable_tiering);
571        assert!(config.enable_columnar);
572        assert!(config.enable_temporal);
573        assert_eq!(config.cache_size_mb, 1024);
574    }
575}