1use crate::model::{Triple, TriplePattern};
7use crate::storage::StorageEngine;
8use crate::OxirsError;
10use async_trait::async_trait;
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::path::{Path, PathBuf};
14use std::sync::Arc;
15use tokio::sync::RwLock;
16
17#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct VirtualStorageConfig {
20 pub path: PathBuf,
22 pub backends: Vec<BackendConfig>,
24 pub routing: RoutingPolicy,
26 pub migration: MigrationPolicy,
28 pub caching: bool,
30 pub cache_size_mb: usize,
32}
33
34impl Default for VirtualStorageConfig {
35 fn default() -> Self {
36 VirtualStorageConfig {
37 path: PathBuf::from("/var/oxirs/virtual"),
38 backends: vec![BackendConfig {
39 name: "primary".to_string(),
40 backend_type: BackendType::Tiered,
41 config: serde_json::json!({
42 "enable_tiering": true,
43 "enable_columnar": false,
44 "enable_temporal": false,
45 "compression": {
46 "Zstd": { "level": 3 }
47 },
48 "tiers": {
49 "hot_tier": {
50 "max_size_mb": 1024,
51 "eviction_policy": "Lru",
52 "ttl_seconds": 3600
53 },
54 "warm_tier": {
55 "path": "/tmp/oxirs_virtual_warm",
56 "max_size_gb": 10,
57 "promotion_threshold": 10,
58 "demotion_threshold_days": 7
59 },
60 "cold_tier": {
61 "path": "/tmp/oxirs_virtual_cold",
62 "max_size_tb": 1,
63 "compression_level": 9,
64 "archive_threshold_days": 90
65 },
66 "archive_tier": {
67 "backend": { "Local": "/tmp/oxirs_virtual_archive" },
68 "retention_years": 7,
69 "immutable": true
70 }
71 },
72 "cache_size_mb": 512
73 }),
74 weight: 1.0,
75 read_only: false,
76 }],
77 routing: RoutingPolicy::default(),
78 migration: MigrationPolicy::default(),
79 caching: true,
80 cache_size_mb: 1024,
81 }
82 }
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct BackendConfig {
88 pub name: String,
90 pub backend_type: BackendType,
92 pub config: serde_json::Value,
94 pub weight: f64,
96 pub read_only: bool,
98}
99
100#[derive(Debug, Clone, Serialize, Deserialize)]
102pub enum BackendType {
103 Tiered,
104 Columnar,
105 Immutable,
106 Temporal,
107 Remote { endpoint: String },
108 Cloud { provider: CloudProvider },
109}
110
111#[derive(Debug, Clone, Serialize, Deserialize)]
113pub enum CloudProvider {
114 AWS { bucket: String, region: String },
115 GCP { bucket: String, project: String },
116 Azure { container: String, account: String },
117}
118
119#[derive(Debug, Clone, Serialize, Deserialize)]
121pub struct RoutingPolicy {
122 pub read_strategy: ReadStrategy,
124 pub write_strategy: WriteStrategy,
126 pub query_hints: HashMap<String, String>,
128 pub predicate_rules: HashMap<String, String>,
130}
131
132impl Default for RoutingPolicy {
133 fn default() -> Self {
134 RoutingPolicy {
135 read_strategy: ReadStrategy::FirstAvailable,
136 write_strategy: WriteStrategy::All,
137 query_hints: HashMap::new(),
138 predicate_rules: HashMap::new(),
139 }
140 }
141}
142
143#[derive(Debug, Clone, Serialize, Deserialize)]
145pub enum ReadStrategy {
146 FirstAvailable,
148 RoundRobin,
150 WeightedRandom,
152 Broadcast,
154 PatternBased,
156}
157
158#[derive(Debug, Clone, Serialize, Deserialize)]
160pub enum WriteStrategy {
161 All,
163 PrimaryOnly,
165 Quorum { n: usize },
167 Selective,
169}
170
171#[derive(Debug, Clone, Serialize, Deserialize)]
173pub struct MigrationPolicy {
174 pub auto_migrate: bool,
176 pub trigger: MigrationTrigger,
178 pub batch_size: usize,
180 pub rate_limit: Option<usize>,
182 pub rules: Vec<MigrationRule>,
184}
185
186impl Default for MigrationPolicy {
187 fn default() -> Self {
188 MigrationPolicy {
189 auto_migrate: false,
190 trigger: MigrationTrigger::Manual,
191 batch_size: 10000,
192 rate_limit: Some(100000),
193 rules: Vec::new(),
194 }
195 }
196}
197
198#[derive(Debug, Clone, Serialize, Deserialize)]
200pub enum MigrationTrigger {
201 Manual,
203 StorageThreshold(f64),
205 Periodic(u32),
207 CostOptimization,
209 Performance,
211}
212
213#[derive(Debug, Clone, Serialize, Deserialize)]
215pub struct MigrationRule {
216 pub name: String,
218 pub source: String,
220 pub target: String,
222 pub criteria: SelectionCriteria,
224 pub priority: u32,
226}
227
228#[derive(Debug, Clone, Serialize, Deserialize)]
230pub enum SelectionCriteria {
231 Age(u32),
233 AccessFrequency { threshold: u32 },
235 Size(usize),
237 PredicatePattern(String),
239 Custom(String),
241}
242
243pub struct VirtualStorage {
245 config: VirtualStorageConfig,
246 backends: Arc<RwLock<HashMap<String, Arc<dyn StorageEngine>>>>,
248 routing_state: Arc<RwLock<RoutingState>>,
250 migration_state: Arc<RwLock<MigrationState>>,
252 cache: Arc<RwLock<StorageCache>>,
254 stats: Arc<RwLock<VirtualStorageStats>>,
256}
257
258struct RoutingState {
260 round_robin_counter: usize,
262 backend_health: HashMap<String, BackendHealth>,
264 #[allow(dead_code)]
266 active_migrations: Vec<String>,
267}
268
269#[derive(Debug, Clone, Serialize, Deserialize)]
271struct BackendHealth {
272 healthy: bool,
274 last_check: u64,
276 failure_count: u32,
278 avg_response_time_ms: f64,
280}
281
282struct MigrationState {
284 active_migrations: HashMap<String, MigrationJob>,
286 history: Vec<MigrationRecord>,
288 #[allow(dead_code)]
290 coordinator: Option<MigrationCoordinator>,
291}
292
293#[derive(Debug, Clone, Serialize, Deserialize)]
295pub struct MigrationJob {
296 id: String,
298 source: String,
300 target: String,
302 progress: MigrationProgress,
304 start_time: u64,
306 status: MigrationStatus,
308}
309
310#[derive(Debug, Clone, Serialize, Deserialize)]
312struct MigrationProgress {
313 total_triples: u64,
315 migrated_triples: u64,
317 failed_triples: u64,
319 current_batch: u64,
321 eta: Option<u64>,
323}
324
325#[derive(Debug, Clone, Serialize, Deserialize)]
327enum MigrationStatus {
328 Pending,
329 Running,
330 Paused,
331 Completed,
332 Failed(String),
333 Cancelled,
334}
335
336#[derive(Debug, Clone, Serialize, Deserialize)]
338struct MigrationRecord {
339 job_id: String,
341 source: String,
343 target: String,
345 start_time: chrono::DateTime<chrono::Utc>,
347 end_time: chrono::DateTime<chrono::Utc>,
349 triples_migrated: u64,
351 status: MigrationStatus,
353}
354
355struct MigrationCoordinator {
357 #[allow(dead_code)]
359 workers: Vec<tokio::task::JoinHandle<()>>,
360 #[allow(dead_code)]
362 control_tx: tokio::sync::mpsc::Sender<MigrationControl>,
363}
364
365#[derive(Debug)]
367enum MigrationControl {
368 #[allow(dead_code)]
369 Pause,
370 #[allow(dead_code)]
371 Resume,
372 #[allow(dead_code)]
373 Cancel,
374 #[allow(dead_code)]
375 UpdateRateLimit(usize),
376}
377
378struct StorageCache {
380 triple_cache: lru::LruCache<u64, Triple>,
382 query_cache: lru::LruCache<String, Vec<Triple>>,
384 stats: CacheStats,
386}
387
388#[derive(Debug, Default)]
390struct CacheStats {
391 hits: u64,
392 misses: u64,
393 #[allow(dead_code)]
394 evictions: u64,
395}
396
397#[derive(Debug, Default)]
399struct VirtualStorageStats {
400 total_operations: u64,
402 #[allow(dead_code)]
404 backend_operations: HashMap<String, u64>,
405 #[allow(dead_code)]
407 migration_stats: MigrationStats,
408 performance: PerformanceMetrics,
410}
411
412#[derive(Debug, Default)]
414struct MigrationStats {
415 #[allow(dead_code)]
417 total_migrations: u64,
418 #[allow(dead_code)]
420 successful_migrations: u64,
421 #[allow(dead_code)]
423 failed_migrations: u64,
424 #[allow(dead_code)]
426 total_triples_migrated: u64,
427 #[allow(dead_code)]
429 total_migration_time_sec: u64,
430}
431
432#[derive(Debug, Default)]
434struct PerformanceMetrics {
435 avg_read_latency_ms: f64,
437 avg_write_latency_ms: f64,
439 query_throughput_qps: f64,
441 #[allow(dead_code)]
443 write_throughput_tps: f64,
444}
445
446impl VirtualStorage {
447 pub async fn new(config: VirtualStorageConfig) -> Result<Self, OxirsError> {
449 std::fs::create_dir_all(&config.path)?;
450
451 let cache_size = (config.cache_size_mb * 1024 * 1024) / 1000; Ok(VirtualStorage {
454 config: config.clone(),
455 backends: Arc::new(RwLock::new(HashMap::new())),
456 routing_state: Arc::new(RwLock::new(RoutingState {
457 round_robin_counter: 0,
458 backend_health: HashMap::new(),
459 active_migrations: Vec::new(),
460 })),
461 migration_state: Arc::new(RwLock::new(MigrationState {
462 active_migrations: HashMap::new(),
463 history: Vec::new(),
464 coordinator: None,
465 })),
466 cache: Arc::new(RwLock::new(StorageCache {
467 triple_cache: lru::LruCache::new(
468 std::num::NonZeroUsize::new(cache_size).unwrap_or(
469 std::num::NonZeroUsize::new(10000).expect("constant is non-zero"),
470 ),
471 ),
472 query_cache: lru::LruCache::new(
473 std::num::NonZeroUsize::new(1000).expect("constant is non-zero"),
474 ),
475 stats: CacheStats::default(),
476 })),
477 stats: Arc::new(RwLock::new(VirtualStorageStats::default())),
478 })
479 }
480
481 pub async fn initialize_backends(&self) -> Result<(), OxirsError> {
483 for backend_config in &self.config.backends {
484 let backend = self.create_backend(backend_config).await?;
485 let mut backends = self.backends.write().await;
486 backends.insert(backend_config.name.clone(), backend);
487
488 let mut routing = self.routing_state.write().await;
490 routing.backend_health.insert(
491 backend_config.name.clone(),
492 BackendHealth {
493 healthy: true,
494 last_check: std::time::SystemTime::now()
495 .duration_since(std::time::UNIX_EPOCH)
496 .expect("SystemTime should be after UNIX_EPOCH")
497 .as_secs(),
498 failure_count: 0,
499 avg_response_time_ms: 0.0,
500 },
501 );
502 }
503
504 self.start_health_monitoring();
506
507 Ok(())
508 }
509
510 pub async fn start_migration(
512 &self,
513 source: &str,
514 target: &str,
515 criteria: SelectionCriteria,
516 ) -> Result<String, OxirsError> {
517 let job_id = uuid::Uuid::new_v4().to_string();
518
519 let job = MigrationJob {
520 id: job_id.clone(),
521 source: source.to_string(),
522 target: target.to_string(),
523 progress: MigrationProgress {
524 total_triples: 0,
525 migrated_triples: 0,
526 failed_triples: 0,
527 current_batch: 0,
528 eta: None,
529 },
530 start_time: std::time::SystemTime::now()
531 .duration_since(std::time::UNIX_EPOCH)
532 .expect("SystemTime should be after UNIX_EPOCH")
533 .as_secs(),
534 status: MigrationStatus::Pending,
535 };
536
537 let mut migration_state = self.migration_state.write().await;
538 migration_state
539 .active_migrations
540 .insert(job_id.clone(), job);
541
542 self.spawn_migration_worker(
544 job_id.clone(),
545 source.to_string(),
546 target.to_string(),
547 criteria,
548 )
549 .await?;
550
551 Ok(job_id)
552 }
553
554 pub async fn get_migration_status(&self, job_id: &str) -> Result<MigrationJob, OxirsError> {
556 let migration_state = self.migration_state.read().await;
557 migration_state
558 .active_migrations
559 .get(job_id)
560 .cloned()
561 .ok_or_else(|| OxirsError::Store(format!("Migration job not found: {job_id}")))
562 }
563
564 async fn create_backend(
566 &self,
567 config: &BackendConfig,
568 ) -> Result<Arc<dyn StorageEngine>, OxirsError> {
569 match &config.backend_type {
570 BackendType::Tiered => {
571 Err(OxirsError::Store(
573 "Tiered backend temporarily disabled due to RocksDB dependency conflicts"
574 .to_string(),
575 ))
576 }
577 BackendType::Columnar => {
578 Err(OxirsError::Store(
580 "Columnar backend not yet integrated".to_string(),
581 ))
582 }
583 BackendType::Immutable => {
584 Err(OxirsError::Store(
586 "Immutable backend not yet integrated".to_string(),
587 ))
588 }
589 BackendType::Temporal => {
590 Err(OxirsError::Store(
592 "Temporal backend not yet integrated".to_string(),
593 ))
594 }
595 BackendType::Remote { endpoint } => {
596 Err(OxirsError::Store(format!(
598 "Remote backend not implemented: {endpoint}"
599 )))
600 }
601 BackendType::Cloud { provider: _ } => {
602 Err(OxirsError::Store(
604 "Cloud backend not implemented".to_string(),
605 ))
606 }
607 }
608 }
609
610 async fn route_read(&self) -> Result<Vec<String>, OxirsError> {
612 let routing_state = self.routing_state.read().await;
613 let backends = self.backends.read().await;
614
615 match self.config.routing.read_strategy {
616 ReadStrategy::FirstAvailable => {
617 for (name, health) in &routing_state.backend_health {
619 if health.healthy && backends.contains_key(name) {
620 return Ok(vec![name.clone()]);
621 }
622 }
623 Err(OxirsError::Store(
624 "No healthy backends available".to_string(),
625 ))
626 }
627 ReadStrategy::RoundRobin => {
628 let healthy_backends: Vec<_> = routing_state
630 .backend_health
631 .iter()
632 .filter(|(name, health)| health.healthy && backends.contains_key(*name))
633 .map(|(name, _)| name.clone())
634 .collect();
635
636 if healthy_backends.is_empty() {
637 return Err(OxirsError::Store(
638 "No healthy backends available".to_string(),
639 ));
640 }
641
642 let index = routing_state.round_robin_counter % healthy_backends.len();
643 Ok(vec![healthy_backends[index].clone()])
644 }
645 ReadStrategy::Broadcast => {
646 Ok(routing_state
648 .backend_health
649 .iter()
650 .filter(|(name, health)| health.healthy && backends.contains_key(*name))
651 .map(|(name, _)| name.clone())
652 .collect())
653 }
654 _ => Ok(vec!["primary".to_string()]), }
656 }
657
658 async fn route_write(&self) -> Result<Vec<String>, OxirsError> {
660 let routing_state = self.routing_state.read().await;
661 let backends = self.backends.read().await;
662
663 match self.config.routing.write_strategy {
664 WriteStrategy::All => {
665 Ok(self
667 .config
668 .backends
669 .iter()
670 .filter(|b| !b.read_only)
671 .filter(|b| {
672 routing_state
673 .backend_health
674 .get(&b.name)
675 .map(|h| h.healthy)
676 .unwrap_or(false)
677 })
678 .filter(|b| backends.contains_key(&b.name))
679 .map(|b| b.name.clone())
680 .collect())
681 }
682 WriteStrategy::PrimaryOnly => Ok(vec!["primary".to_string()]),
683 WriteStrategy::Quorum { n } => {
684 let writable: Vec<_> = self
686 .config
687 .backends
688 .iter()
689 .filter(|b| !b.read_only)
690 .filter(|b| {
691 routing_state
692 .backend_health
693 .get(&b.name)
694 .map(|h| h.healthy)
695 .unwrap_or(false)
696 })
697 .filter(|b| backends.contains_key(&b.name))
698 .take(n)
699 .map(|b| b.name.clone())
700 .collect();
701
702 if writable.len() < n {
703 return Err(OxirsError::Store(format!(
704 "Not enough healthy backends for quorum: need {}, have {}",
705 n,
706 writable.len()
707 )));
708 }
709
710 Ok(writable)
711 }
712 WriteStrategy::Selective => {
713 Ok(vec!["primary".to_string()]) }
716 }
717 }
718
719 fn start_health_monitoring(&self) {
721 let backends = self.backends.clone();
722 let routing_state = self.routing_state.clone();
723
724 tokio::spawn(async move {
725 let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
726
727 loop {
728 interval.tick().await;
729
730 let backend_list = backends.read().await;
732 for (name, backend) in backend_list.iter() {
733 let start = std::time::Instant::now();
734 let health_check = backend.stats().await;
735 let elapsed = start.elapsed();
736
737 let mut routing = routing_state.write().await;
738 if let Some(health) = routing.backend_health.get_mut(name) {
739 health.last_check = std::time::SystemTime::now()
740 .duration_since(std::time::UNIX_EPOCH)
741 .expect("SystemTime should be after UNIX_EPOCH")
742 .as_secs();
743 health.avg_response_time_ms = elapsed.as_millis() as f64;
744
745 if health_check.is_ok() {
746 health.healthy = true;
747 health.failure_count = 0;
748 } else {
749 health.failure_count += 1;
750 if health.failure_count >= 3 {
751 health.healthy = false;
752 }
753 }
754 }
755 }
756 }
757 });
758 }
759
760 async fn spawn_migration_worker(
762 &self,
763 job_id: String,
764 _source: String,
765 _target: String,
766 _criteria: SelectionCriteria,
767 ) -> Result<(), OxirsError> {
768 let _backends = self.backends.clone();
769 let migration_state = self.migration_state.clone();
770 let _config = self.config.clone();
771
772 tokio::spawn(async move {
773 let mut state = migration_state.write().await;
776 if let Some(job) = state.active_migrations.get_mut(&job_id) {
777 job.status = MigrationStatus::Running;
778 }
779
780 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
782
783 let mut state = migration_state.write().await;
785 if let Some(job) = state.active_migrations.get_mut(&job_id) {
786 job.status = MigrationStatus::Completed;
787 job.progress.migrated_triples = 1000; }
789 });
790
791 Ok(())
792 }
793}
794
795#[async_trait]
796impl StorageEngine for VirtualStorage {
797 async fn init(&mut self, _config: super::StorageConfig) -> Result<(), OxirsError> {
798 self.initialize_backends().await
799 }
800
801 async fn store_triple(&self, triple: &Triple) -> Result<(), OxirsError> {
802 let start = std::time::Instant::now();
803
804 let target_backends = self.route_write().await?;
806 let backends = self.backends.read().await;
807
808 let mut errors = Vec::new();
809 for backend_name in &target_backends {
810 if let Some(backend) = backends.get(backend_name) {
811 if let Err(e) = backend.store_triple(triple).await {
812 errors.push((backend_name.clone(), e));
813 }
814 }
815 }
816
817 if self.config.caching {
819 let mut cache = self.cache.write().await;
820 let hash = self.hash_triple(triple);
821 cache.triple_cache.put(hash, triple.clone());
822 }
823
824 let elapsed = start.elapsed();
826 let mut stats = self.stats.write().await;
827 stats.total_operations += 1;
828 stats.performance.avg_write_latency_ms = (stats.performance.avg_write_latency_ms
829 * (stats.total_operations - 1) as f64
830 + elapsed.as_millis() as f64)
831 / stats.total_operations as f64;
832
833 match self.config.routing.write_strategy {
835 WriteStrategy::All if !errors.is_empty() => {
836 return Err(OxirsError::Store(format!(
837 "Failed to write to backends: {errors:?}"
838 )));
839 }
840 WriteStrategy::Quorum { n } if target_backends.len() - errors.len() < n => {
841 return Err(OxirsError::Store(format!(
842 "Quorum write failed: {} successes, needed {}",
843 target_backends.len() - errors.len(),
844 n
845 )));
846 }
847 _ => {}
848 }
849
850 Ok(())
851 }
852
853 async fn store_triples(&self, triples: &[Triple]) -> Result<(), OxirsError> {
854 for triple in triples {
856 self.store_triple(triple).await?;
857 }
858 Ok(())
859 }
860
861 async fn query_triples(&self, pattern: &TriplePattern) -> Result<Vec<Triple>, OxirsError> {
862 let start = std::time::Instant::now();
863
864 if self.config.caching {
866 let pattern_key = format!("{pattern:?}");
867 let mut cache = self.cache.write().await;
868 if let Some(cached) = cache.query_cache.get(&pattern_key).cloned() {
869 cache.stats.hits += 1;
870 return Ok(cached);
871 }
872 cache.stats.misses += 1;
873 }
874
875 let target_backends = self.route_read().await?;
877 let backends = self.backends.read().await;
878
879 let mut all_results = Vec::new();
880 let mut seen = std::collections::HashSet::new();
881
882 for backend_name in &target_backends {
883 if let Some(backend) = backends.get(backend_name) {
884 match backend.query_triples(pattern).await {
885 Ok(results) => {
886 for triple in results {
887 let hash = self.hash_triple(&triple);
888 if seen.insert(hash) {
889 all_results.push(triple);
890 }
891 }
892 }
893 Err(e) => {
894 tracing::warn!("Query failed on backend {}: {}", backend_name, e);
895 }
896 }
897 }
898 }
899
900 if self.config.caching && !all_results.is_empty() {
902 let pattern_key = format!("{pattern:?}");
903 let mut cache = self.cache.write().await;
904 cache.query_cache.put(pattern_key, all_results.clone());
905 }
906
907 let elapsed = start.elapsed();
909 let mut stats = self.stats.write().await;
910 stats.total_operations += 1;
911 stats.performance.avg_read_latency_ms = (stats.performance.avg_read_latency_ms
912 * (stats.total_operations - 1) as f64
913 + elapsed.as_millis() as f64)
914 / stats.total_operations as f64;
915
916 Ok(all_results)
917 }
918
919 async fn delete_triples(&self, pattern: &TriplePattern) -> Result<usize, OxirsError> {
920 let target_backends = self.route_write().await?;
922 let backends = self.backends.read().await;
923
924 let mut total_deleted = 0;
925 for backend_name in &target_backends {
926 if let Some(backend) = backends.get(backend_name) {
927 match backend.delete_triples(pattern).await {
928 Ok(count) => total_deleted = total_deleted.max(count),
929 Err(e) => {
930 tracing::warn!("Delete failed on backend {}: {}", backend_name, e);
931 }
932 }
933 }
934 }
935
936 if self.config.caching {
938 let mut cache = self.cache.write().await;
939 cache.query_cache.clear();
940 }
942
943 Ok(total_deleted)
944 }
945
946 async fn stats(&self) -> Result<super::StorageStats, OxirsError> {
947 let backends = self.backends.read().await;
948 let stats = self.stats.read().await;
949
950 let mut total_triples = 0u64;
952 let mut total_size = 0u64;
953
954 for (_, backend) in backends.iter() {
955 if let Ok(backend_stats) = backend.stats().await {
956 total_triples += backend_stats.total_triples;
957 total_size += backend_stats.total_size_bytes;
958 }
959 }
960
961 Ok(super::StorageStats {
962 total_triples,
963 total_size_bytes: total_size,
964 tier_stats: super::TierStats {
965 hot: super::TierStat {
966 triple_count: 0,
967 size_bytes: 0,
968 hit_rate: 0.0,
969 avg_access_time_us: 0,
970 },
971 warm: super::TierStat {
972 triple_count: 0,
973 size_bytes: 0,
974 hit_rate: 0.0,
975 avg_access_time_us: 0,
976 },
977 cold: super::TierStat {
978 triple_count: 0,
979 size_bytes: 0,
980 hit_rate: 0.0,
981 avg_access_time_us: 0,
982 },
983 archive: super::TierStat {
984 triple_count: 0,
985 size_bytes: 0,
986 hit_rate: 0.0,
987 avg_access_time_us: 0,
988 },
989 },
990 compression_ratio: 1.0,
991 query_metrics: super::QueryMetrics {
992 avg_query_time_ms: stats.performance.avg_read_latency_ms,
993 p99_query_time_ms: stats.performance.avg_read_latency_ms * 2.0, qps: stats.performance.query_throughput_qps,
995 cache_hit_rate: if self.config.caching {
996 let cache = self.cache.read().await;
997 let total = cache.stats.hits + cache.stats.misses;
998 if total > 0 {
999 (cache.stats.hits as f64 / total as f64) * 100.0
1000 } else {
1001 0.0
1002 }
1003 } else {
1004 0.0
1005 },
1006 },
1007 })
1008 }
1009
1010 async fn optimize(&self) -> Result<(), OxirsError> {
1011 let backends = self.backends.read().await;
1012
1013 for (name, backend) in backends.iter() {
1015 if let Err(e) = backend.optimize().await {
1016 tracing::warn!("Optimization failed on backend {}: {}", name, e);
1017 }
1018 }
1019
1020 if self.config.caching {
1022 let mut cache = self.cache.write().await;
1023 cache.query_cache.clear();
1024 }
1025
1026 Ok(())
1027 }
1028
1029 async fn backup(&self, path: &Path) -> Result<(), OxirsError> {
1030 std::fs::create_dir_all(path)?;
1031
1032 let backends = self.backends.read().await;
1033
1034 for (name, backend) in backends.iter() {
1036 let backend_path = path.join(name);
1037 backend.backup(&backend_path).await?;
1038 }
1039
1040 let metadata = VirtualStorageMetadata {
1042 config: self.config.clone(),
1043 migration_history: {
1044 let state = self.migration_state.read().await;
1045 state.history.clone()
1046 },
1047 };
1048
1049 let metadata_path = path.join("virtual_metadata.json");
1050 let metadata_json = serde_json::to_string_pretty(&metadata)
1051 .map_err(|e| OxirsError::Serialize(e.to_string()))?;
1052 std::fs::write(metadata_path, metadata_json)?;
1053
1054 Ok(())
1055 }
1056
1057 async fn restore(&self, path: &Path) -> Result<(), OxirsError> {
1058 let metadata_path = path.join("virtual_metadata.json");
1060 let metadata_json = std::fs::read_to_string(metadata_path)?;
1061 let metadata: VirtualStorageMetadata =
1062 serde_json::from_str(&metadata_json).map_err(|e| OxirsError::Parse(e.to_string()))?;
1063
1064 let backends = self.backends.read().await;
1065
1066 for (name, backend) in backends.iter() {
1068 let backend_path = path.join(name);
1069 if backend_path.exists() {
1070 backend.restore(&backend_path).await?;
1071 }
1072 }
1073
1074 let mut state = self.migration_state.write().await;
1076 state.history = metadata.migration_history;
1077
1078 Ok(())
1079 }
1080}
1081
1082#[derive(Debug, Serialize, Deserialize)]
1084struct VirtualStorageMetadata {
1085 config: VirtualStorageConfig,
1086 migration_history: Vec<MigrationRecord>,
1087}
1088
1089impl VirtualStorage {
1090 fn hash_triple(&self, triple: &Triple) -> u64 {
1092 use std::hash::{Hash, Hasher};
1093 let mut hasher = std::collections::hash_map::DefaultHasher::new();
1094 triple.hash(&mut hasher);
1095 hasher.finish()
1096 }
1097}
1098
1099#[cfg(test)]
1100mod tests {
1101 use super::*;
1102 use crate::model::{Literal, NamedNode};
1103
1104 #[tokio::test]
1105 #[ignore = "Virtual storage backends not yet implemented - all backend types are disabled"]
1106 async fn test_virtual_storage() {
1107 let test_dir = format!(
1108 "/tmp/oxirs_virtual_test_{}",
1109 std::time::SystemTime::now()
1110 .duration_since(std::time::UNIX_EPOCH)
1111 .expect("operation should succeed")
1112 .as_millis()
1113 );
1114
1115 let mut config = VirtualStorageConfig {
1116 path: PathBuf::from(&test_dir),
1117 ..Default::default()
1118 };
1119
1120 if let Some(backend) = config.backends.get_mut(0) {
1122 backend.config = serde_json::json!({
1123 "enable_tiering": true,
1124 "enable_columnar": false,
1125 "enable_temporal": false,
1126 "compression": {
1127 "Zstd": { "level": 3 }
1128 },
1129 "tiers": {
1130 "hot_tier": {
1131 "max_size_mb": 1024,
1132 "eviction_policy": "Lru",
1133 "ttl_seconds": 3600
1134 },
1135 "warm_tier": {
1136 "path": format!("{test_dir}/warm"),
1137 "max_size_gb": 10,
1138 "promotion_threshold": 10,
1139 "demotion_threshold_days": 7
1140 },
1141 "cold_tier": {
1142 "path": format!("{test_dir}/cold"),
1143 "max_size_tb": 1,
1144 "compression_level": 9,
1145 "archive_threshold_days": 90
1146 },
1147 "archive_tier": {
1148 "backend": { "Local": format!("{test_dir}/archive") },
1149 "retention_years": 7,
1150 "immutable": true
1151 }
1152 },
1153 "cache_size_mb": 512
1154 });
1155 }
1156
1157 let storage = VirtualStorage::new(config)
1158 .await
1159 .expect("async operation should succeed");
1160 storage
1161 .initialize_backends()
1162 .await
1163 .expect("async operation should succeed");
1164
1165 let triple = Triple::new(
1167 NamedNode::new("http://example.org/s").expect("valid IRI"),
1168 NamedNode::new("http://example.org/p").expect("valid IRI"),
1169 crate::model::Object::Literal(Literal::new("test")),
1170 );
1171
1172 storage
1174 .store_triple(&triple)
1175 .await
1176 .expect("async operation should succeed");
1177
1178 let pattern = TriplePattern::new(None, None, None);
1180 let results = storage
1181 .query_triples(&pattern)
1182 .await
1183 .expect("async operation should succeed");
1184 assert_eq!(results.len(), 1);
1185 assert_eq!(results[0], triple);
1186 }
1187}