Skip to main content

oxirs_core/storage/
virtualization.rs

1//! Storage virtualization with transparent migration
2//!
3//! This module provides a virtualized storage layer that can transparently
4//! route requests to different storage backends and migrate data between them.
5
6use crate::model::{Triple, TriplePattern};
7use crate::storage::StorageEngine;
8// Note: tiered::TieredStorageEngine temporarily disabled due to dependency conflicts
9use crate::OxirsError;
10use async_trait::async_trait;
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::path::{Path, PathBuf};
14use std::sync::Arc;
15use tokio::sync::RwLock;
16
17/// Virtual storage configuration
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct VirtualStorageConfig {
20    /// Base path for virtual storage metadata
21    pub path: PathBuf,
22    /// Backend configurations
23    pub backends: Vec<BackendConfig>,
24    /// Routing policy
25    pub routing: RoutingPolicy,
26    /// Migration policy
27    pub migration: MigrationPolicy,
28    /// Enable transparent caching
29    pub caching: bool,
30    /// Cache size in MB
31    pub cache_size_mb: usize,
32}
33
34impl Default for VirtualStorageConfig {
35    fn default() -> Self {
36        VirtualStorageConfig {
37            path: PathBuf::from("/var/oxirs/virtual"),
38            backends: vec![BackendConfig {
39                name: "primary".to_string(),
40                backend_type: BackendType::Tiered,
41                config: serde_json::json!({
42                    "enable_tiering": true,
43                    "enable_columnar": false,
44                    "enable_temporal": false,
45                    "compression": {
46                        "Zstd": { "level": 3 }
47                    },
48                    "tiers": {
49                        "hot_tier": {
50                            "max_size_mb": 1024,
51                            "eviction_policy": "Lru",
52                            "ttl_seconds": 3600
53                        },
54                        "warm_tier": {
55                            "path": "/tmp/oxirs_virtual_warm",
56                            "max_size_gb": 10,
57                            "promotion_threshold": 10,
58                            "demotion_threshold_days": 7
59                        },
60                        "cold_tier": {
61                            "path": "/tmp/oxirs_virtual_cold",
62                            "max_size_tb": 1,
63                            "compression_level": 9,
64                            "archive_threshold_days": 90
65                        },
66                        "archive_tier": {
67                            "backend": { "Local": "/tmp/oxirs_virtual_archive" },
68                            "retention_years": 7,
69                            "immutable": true
70                        }
71                    },
72                    "cache_size_mb": 512
73                }),
74                weight: 1.0,
75                read_only: false,
76            }],
77            routing: RoutingPolicy::default(),
78            migration: MigrationPolicy::default(),
79            caching: true,
80            cache_size_mb: 1024,
81        }
82    }
83}
84
85/// Backend configuration
86#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct BackendConfig {
88    /// Backend name
89    pub name: String,
90    /// Backend type
91    pub backend_type: BackendType,
92    /// Backend-specific configuration
93    pub config: serde_json::Value,
94    /// Routing weight (for load balancing)
95    pub weight: f64,
96    /// Read-only flag
97    pub read_only: bool,
98}
99
100/// Storage backend type
101#[derive(Debug, Clone, Serialize, Deserialize)]
102pub enum BackendType {
103    Tiered,
104    Columnar,
105    Immutable,
106    Temporal,
107    Remote { endpoint: String },
108    Cloud { provider: CloudProvider },
109}
110
111/// Cloud storage provider
112#[derive(Debug, Clone, Serialize, Deserialize)]
113pub enum CloudProvider {
114    AWS { bucket: String, region: String },
115    GCP { bucket: String, project: String },
116    Azure { container: String, account: String },
117}
118
119/// Routing policy for virtual storage
120#[derive(Debug, Clone, Serialize, Deserialize)]
121pub struct RoutingPolicy {
122    /// Read routing strategy
123    pub read_strategy: ReadStrategy,
124    /// Write routing strategy
125    pub write_strategy: WriteStrategy,
126    /// Query routing hints
127    pub query_hints: HashMap<String, String>,
128    /// Predicate-based routing rules
129    pub predicate_rules: HashMap<String, String>,
130}
131
132impl Default for RoutingPolicy {
133    fn default() -> Self {
134        RoutingPolicy {
135            read_strategy: ReadStrategy::FirstAvailable,
136            write_strategy: WriteStrategy::All,
137            query_hints: HashMap::new(),
138            predicate_rules: HashMap::new(),
139        }
140    }
141}
142
143/// Read routing strategy
144#[derive(Debug, Clone, Serialize, Deserialize)]
145pub enum ReadStrategy {
146    /// Use first available backend
147    FirstAvailable,
148    /// Round-robin across backends
149    RoundRobin,
150    /// Weighted random selection
151    WeightedRandom,
152    /// Query all and merge results
153    Broadcast,
154    /// Use specific backend based on pattern
155    PatternBased,
156}
157
158/// Write routing strategy
159#[derive(Debug, Clone, Serialize, Deserialize)]
160pub enum WriteStrategy {
161    /// Write to all backends
162    All,
163    /// Write to primary only
164    PrimaryOnly,
165    /// Write to N backends
166    Quorum { n: usize },
167    /// Write based on data characteristics
168    Selective,
169}
170
171/// Migration policy
172#[derive(Debug, Clone, Serialize, Deserialize)]
173pub struct MigrationPolicy {
174    /// Enable automatic migration
175    pub auto_migrate: bool,
176    /// Migration trigger
177    pub trigger: MigrationTrigger,
178    /// Batch size for migration
179    pub batch_size: usize,
180    /// Rate limit (triples per second)
181    pub rate_limit: Option<usize>,
182    /// Migration rules
183    pub rules: Vec<MigrationRule>,
184}
185
186impl Default for MigrationPolicy {
187    fn default() -> Self {
188        MigrationPolicy {
189            auto_migrate: false,
190            trigger: MigrationTrigger::Manual,
191            batch_size: 10000,
192            rate_limit: Some(100000),
193            rules: Vec::new(),
194        }
195    }
196}
197
198/// Migration trigger
199#[derive(Debug, Clone, Serialize, Deserialize)]
200pub enum MigrationTrigger {
201    /// Manual migration only
202    Manual,
203    /// Storage threshold (percentage)
204    StorageThreshold(f64),
205    /// Time-based (hours)
206    Periodic(u32),
207    /// Cost-based optimization
208    CostOptimization,
209    /// Performance-based
210    Performance,
211}
212
213/// Migration rule
214#[derive(Debug, Clone, Serialize, Deserialize)]
215pub struct MigrationRule {
216    /// Rule name
217    pub name: String,
218    /// Source backend pattern
219    pub source: String,
220    /// Target backend
221    pub target: String,
222    /// Selection criteria
223    pub criteria: SelectionCriteria,
224    /// Priority
225    pub priority: u32,
226}
227
228/// Selection criteria for migration
229#[derive(Debug, Clone, Serialize, Deserialize)]
230pub enum SelectionCriteria {
231    /// Age-based (days)
232    Age(u32),
233    /// Access frequency
234    AccessFrequency { threshold: u32 },
235    /// Size-based (MB)
236    Size(usize),
237    /// Predicate pattern
238    PredicatePattern(String),
239    /// Custom function
240    Custom(String),
241}
242
243/// Virtual storage engine
244pub struct VirtualStorage {
245    config: VirtualStorageConfig,
246    /// Storage backends
247    backends: Arc<RwLock<HashMap<String, Arc<dyn StorageEngine>>>>,
248    /// Routing state
249    routing_state: Arc<RwLock<RoutingState>>,
250    /// Migration state
251    migration_state: Arc<RwLock<MigrationState>>,
252    /// Cache
253    cache: Arc<RwLock<StorageCache>>,
254    /// Statistics
255    stats: Arc<RwLock<VirtualStorageStats>>,
256}
257
258/// Routing state
259struct RoutingState {
260    /// Round-robin counter
261    round_robin_counter: usize,
262    /// Backend health status
263    backend_health: HashMap<String, BackendHealth>,
264    /// Active migrations
265    #[allow(dead_code)]
266    active_migrations: Vec<String>,
267}
268
269/// Backend health status
270#[derive(Debug, Clone, Serialize, Deserialize)]
271struct BackendHealth {
272    /// Is backend healthy
273    healthy: bool,
274    /// Last check time (timestamp in seconds)
275    last_check: u64,
276    /// Consecutive failures
277    failure_count: u32,
278    /// Average response time
279    avg_response_time_ms: f64,
280}
281
282/// Migration state
283struct MigrationState {
284    /// Active migrations
285    active_migrations: HashMap<String, MigrationJob>,
286    /// Migration history
287    history: Vec<MigrationRecord>,
288    /// Migration coordinator
289    #[allow(dead_code)]
290    coordinator: Option<MigrationCoordinator>,
291}
292
293/// Migration job
294#[derive(Debug, Clone, Serialize, Deserialize)]
295pub struct MigrationJob {
296    /// Job ID
297    id: String,
298    /// Source backend
299    source: String,
300    /// Target backend
301    target: String,
302    /// Progress
303    progress: MigrationProgress,
304    /// Start time (timestamp in seconds)
305    start_time: u64,
306    /// Status
307    status: MigrationStatus,
308}
309
310/// Migration progress
311#[derive(Debug, Clone, Serialize, Deserialize)]
312struct MigrationProgress {
313    /// Total triples to migrate
314    total_triples: u64,
315    /// Triples migrated
316    migrated_triples: u64,
317    /// Triples failed
318    failed_triples: u64,
319    /// Current batch
320    current_batch: u64,
321    /// Estimated completion time (seconds)
322    eta: Option<u64>,
323}
324
325/// Migration status
326#[derive(Debug, Clone, Serialize, Deserialize)]
327enum MigrationStatus {
328    Pending,
329    Running,
330    Paused,
331    Completed,
332    Failed(String),
333    Cancelled,
334}
335
336/// Migration record
337#[derive(Debug, Clone, Serialize, Deserialize)]
338struct MigrationRecord {
339    /// Job ID
340    job_id: String,
341    /// Source backend
342    source: String,
343    /// Target backend
344    target: String,
345    /// Start time
346    start_time: chrono::DateTime<chrono::Utc>,
347    /// End time
348    end_time: chrono::DateTime<chrono::Utc>,
349    /// Triples migrated
350    triples_migrated: u64,
351    /// Status
352    status: MigrationStatus,
353}
354
355/// Migration coordinator
356struct MigrationCoordinator {
357    /// Active workers
358    #[allow(dead_code)]
359    workers: Vec<tokio::task::JoinHandle<()>>,
360    /// Control channel
361    #[allow(dead_code)]
362    control_tx: tokio::sync::mpsc::Sender<MigrationControl>,
363}
364
365/// Migration control messages
366#[derive(Debug)]
367enum MigrationControl {
368    #[allow(dead_code)]
369    Pause,
370    #[allow(dead_code)]
371    Resume,
372    #[allow(dead_code)]
373    Cancel,
374    #[allow(dead_code)]
375    UpdateRateLimit(usize),
376}
377
378/// Storage cache
379struct StorageCache {
380    /// Triple cache
381    triple_cache: lru::LruCache<u64, Triple>,
382    /// Query result cache
383    query_cache: lru::LruCache<String, Vec<Triple>>,
384    /// Cache statistics
385    stats: CacheStats,
386}
387
388/// Cache statistics
389#[derive(Debug, Default)]
390struct CacheStats {
391    hits: u64,
392    misses: u64,
393    #[allow(dead_code)]
394    evictions: u64,
395}
396
397/// Virtual storage statistics
398#[derive(Debug, Default)]
399struct VirtualStorageStats {
400    /// Total operations
401    total_operations: u64,
402    /// Operations by backend
403    #[allow(dead_code)]
404    backend_operations: HashMap<String, u64>,
405    /// Migration statistics
406    #[allow(dead_code)]
407    migration_stats: MigrationStats,
408    /// Performance metrics
409    performance: PerformanceMetrics,
410}
411
412/// Migration statistics
413#[derive(Debug, Default)]
414struct MigrationStats {
415    /// Total migrations
416    #[allow(dead_code)]
417    total_migrations: u64,
418    /// Successful migrations
419    #[allow(dead_code)]
420    successful_migrations: u64,
421    /// Failed migrations
422    #[allow(dead_code)]
423    failed_migrations: u64,
424    /// Total triples migrated
425    #[allow(dead_code)]
426    total_triples_migrated: u64,
427    /// Total migration time
428    #[allow(dead_code)]
429    total_migration_time_sec: u64,
430}
431
432/// Performance metrics
433#[derive(Debug, Default)]
434struct PerformanceMetrics {
435    /// Average read latency
436    avg_read_latency_ms: f64,
437    /// Average write latency
438    avg_write_latency_ms: f64,
439    /// Query throughput
440    query_throughput_qps: f64,
441    /// Write throughput
442    #[allow(dead_code)]
443    write_throughput_tps: f64,
444}
445
446impl VirtualStorage {
447    /// Create new virtual storage
448    pub async fn new(config: VirtualStorageConfig) -> Result<Self, OxirsError> {
449        std::fs::create_dir_all(&config.path)?;
450
451        let cache_size = (config.cache_size_mb * 1024 * 1024) / 1000; // Approximate entries
452
453        Ok(VirtualStorage {
454            config: config.clone(),
455            backends: Arc::new(RwLock::new(HashMap::new())),
456            routing_state: Arc::new(RwLock::new(RoutingState {
457                round_robin_counter: 0,
458                backend_health: HashMap::new(),
459                active_migrations: Vec::new(),
460            })),
461            migration_state: Arc::new(RwLock::new(MigrationState {
462                active_migrations: HashMap::new(),
463                history: Vec::new(),
464                coordinator: None,
465            })),
466            cache: Arc::new(RwLock::new(StorageCache {
467                triple_cache: lru::LruCache::new(
468                    std::num::NonZeroUsize::new(cache_size).unwrap_or(
469                        std::num::NonZeroUsize::new(10000).expect("constant is non-zero"),
470                    ),
471                ),
472                query_cache: lru::LruCache::new(
473                    std::num::NonZeroUsize::new(1000).expect("constant is non-zero"),
474                ),
475                stats: CacheStats::default(),
476            })),
477            stats: Arc::new(RwLock::new(VirtualStorageStats::default())),
478        })
479    }
480
481    /// Initialize backends
482    pub async fn initialize_backends(&self) -> Result<(), OxirsError> {
483        for backend_config in &self.config.backends {
484            let backend = self.create_backend(backend_config).await?;
485            let mut backends = self.backends.write().await;
486            backends.insert(backend_config.name.clone(), backend);
487
488            // Initialize health status
489            let mut routing = self.routing_state.write().await;
490            routing.backend_health.insert(
491                backend_config.name.clone(),
492                BackendHealth {
493                    healthy: true,
494                    last_check: std::time::SystemTime::now()
495                        .duration_since(std::time::UNIX_EPOCH)
496                        .expect("SystemTime should be after UNIX_EPOCH")
497                        .as_secs(),
498                    failure_count: 0,
499                    avg_response_time_ms: 0.0,
500                },
501            );
502        }
503
504        // Start health monitoring
505        self.start_health_monitoring();
506
507        Ok(())
508    }
509
510    /// Start a migration job
511    pub async fn start_migration(
512        &self,
513        source: &str,
514        target: &str,
515        criteria: SelectionCriteria,
516    ) -> Result<String, OxirsError> {
517        let job_id = uuid::Uuid::new_v4().to_string();
518
519        let job = MigrationJob {
520            id: job_id.clone(),
521            source: source.to_string(),
522            target: target.to_string(),
523            progress: MigrationProgress {
524                total_triples: 0,
525                migrated_triples: 0,
526                failed_triples: 0,
527                current_batch: 0,
528                eta: None,
529            },
530            start_time: std::time::SystemTime::now()
531                .duration_since(std::time::UNIX_EPOCH)
532                .expect("SystemTime should be after UNIX_EPOCH")
533                .as_secs(),
534            status: MigrationStatus::Pending,
535        };
536
537        let mut migration_state = self.migration_state.write().await;
538        migration_state
539            .active_migrations
540            .insert(job_id.clone(), job);
541
542        // Start migration worker
543        self.spawn_migration_worker(
544            job_id.clone(),
545            source.to_string(),
546            target.to_string(),
547            criteria,
548        )
549        .await?;
550
551        Ok(job_id)
552    }
553
554    /// Get migration status
555    pub async fn get_migration_status(&self, job_id: &str) -> Result<MigrationJob, OxirsError> {
556        let migration_state = self.migration_state.read().await;
557        migration_state
558            .active_migrations
559            .get(job_id)
560            .cloned()
561            .ok_or_else(|| OxirsError::Store(format!("Migration job not found: {job_id}")))
562    }
563
564    /// Create backend instance
565    async fn create_backend(
566        &self,
567        config: &BackendConfig,
568    ) -> Result<Arc<dyn StorageEngine>, OxirsError> {
569        match &config.backend_type {
570            BackendType::Tiered => {
571                // Tiered backend temporarily disabled due to dependency conflicts
572                Err(OxirsError::Store(
573                    "Tiered backend temporarily disabled due to RocksDB dependency conflicts"
574                        .to_string(),
575                ))
576            }
577            BackendType::Columnar => {
578                // Create columnar backend
579                Err(OxirsError::Store(
580                    "Columnar backend not yet integrated".to_string(),
581                ))
582            }
583            BackendType::Immutable => {
584                // Create immutable backend
585                Err(OxirsError::Store(
586                    "Immutable backend not yet integrated".to_string(),
587                ))
588            }
589            BackendType::Temporal => {
590                // Create temporal backend
591                Err(OxirsError::Store(
592                    "Temporal backend not yet integrated".to_string(),
593                ))
594            }
595            BackendType::Remote { endpoint } => {
596                // Create remote backend proxy
597                Err(OxirsError::Store(format!(
598                    "Remote backend not implemented: {endpoint}"
599                )))
600            }
601            BackendType::Cloud { provider: _ } => {
602                // Create cloud backend
603                Err(OxirsError::Store(
604                    "Cloud backend not implemented".to_string(),
605                ))
606            }
607        }
608    }
609
610    /// Route read operation
611    async fn route_read(&self) -> Result<Vec<String>, OxirsError> {
612        let routing_state = self.routing_state.read().await;
613        let backends = self.backends.read().await;
614
615        match self.config.routing.read_strategy {
616            ReadStrategy::FirstAvailable => {
617                // Return first healthy backend
618                for (name, health) in &routing_state.backend_health {
619                    if health.healthy && backends.contains_key(name) {
620                        return Ok(vec![name.clone()]);
621                    }
622                }
623                Err(OxirsError::Store(
624                    "No healthy backends available".to_string(),
625                ))
626            }
627            ReadStrategy::RoundRobin => {
628                // Round-robin selection
629                let healthy_backends: Vec<_> = routing_state
630                    .backend_health
631                    .iter()
632                    .filter(|(name, health)| health.healthy && backends.contains_key(*name))
633                    .map(|(name, _)| name.clone())
634                    .collect();
635
636                if healthy_backends.is_empty() {
637                    return Err(OxirsError::Store(
638                        "No healthy backends available".to_string(),
639                    ));
640                }
641
642                let index = routing_state.round_robin_counter % healthy_backends.len();
643                Ok(vec![healthy_backends[index].clone()])
644            }
645            ReadStrategy::Broadcast => {
646                // Query all healthy backends
647                Ok(routing_state
648                    .backend_health
649                    .iter()
650                    .filter(|(name, health)| health.healthy && backends.contains_key(*name))
651                    .map(|(name, _)| name.clone())
652                    .collect())
653            }
654            _ => Ok(vec!["primary".to_string()]), // Default to primary
655        }
656    }
657
658    /// Route write operation
659    async fn route_write(&self) -> Result<Vec<String>, OxirsError> {
660        let routing_state = self.routing_state.read().await;
661        let backends = self.backends.read().await;
662
663        match self.config.routing.write_strategy {
664            WriteStrategy::All => {
665                // Write to all writable backends
666                Ok(self
667                    .config
668                    .backends
669                    .iter()
670                    .filter(|b| !b.read_only)
671                    .filter(|b| {
672                        routing_state
673                            .backend_health
674                            .get(&b.name)
675                            .map(|h| h.healthy)
676                            .unwrap_or(false)
677                    })
678                    .filter(|b| backends.contains_key(&b.name))
679                    .map(|b| b.name.clone())
680                    .collect())
681            }
682            WriteStrategy::PrimaryOnly => Ok(vec!["primary".to_string()]),
683            WriteStrategy::Quorum { n } => {
684                // Select N backends
685                let writable: Vec<_> = self
686                    .config
687                    .backends
688                    .iter()
689                    .filter(|b| !b.read_only)
690                    .filter(|b| {
691                        routing_state
692                            .backend_health
693                            .get(&b.name)
694                            .map(|h| h.healthy)
695                            .unwrap_or(false)
696                    })
697                    .filter(|b| backends.contains_key(&b.name))
698                    .take(n)
699                    .map(|b| b.name.clone())
700                    .collect();
701
702                if writable.len() < n {
703                    return Err(OxirsError::Store(format!(
704                        "Not enough healthy backends for quorum: need {}, have {}",
705                        n,
706                        writable.len()
707                    )));
708                }
709
710                Ok(writable)
711            }
712            WriteStrategy::Selective => {
713                // Selective routing based on data characteristics
714                Ok(vec!["primary".to_string()]) // Simplified
715            }
716        }
717    }
718
719    /// Start health monitoring
720    fn start_health_monitoring(&self) {
721        let backends = self.backends.clone();
722        let routing_state = self.routing_state.clone();
723
724        tokio::spawn(async move {
725            let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
726
727            loop {
728                interval.tick().await;
729
730                // Check health of each backend
731                let backend_list = backends.read().await;
732                for (name, backend) in backend_list.iter() {
733                    let start = std::time::Instant::now();
734                    let health_check = backend.stats().await;
735                    let elapsed = start.elapsed();
736
737                    let mut routing = routing_state.write().await;
738                    if let Some(health) = routing.backend_health.get_mut(name) {
739                        health.last_check = std::time::SystemTime::now()
740                            .duration_since(std::time::UNIX_EPOCH)
741                            .expect("SystemTime should be after UNIX_EPOCH")
742                            .as_secs();
743                        health.avg_response_time_ms = elapsed.as_millis() as f64;
744
745                        if health_check.is_ok() {
746                            health.healthy = true;
747                            health.failure_count = 0;
748                        } else {
749                            health.failure_count += 1;
750                            if health.failure_count >= 3 {
751                                health.healthy = false;
752                            }
753                        }
754                    }
755                }
756            }
757        });
758    }
759
760    /// Spawn migration worker
761    async fn spawn_migration_worker(
762        &self,
763        job_id: String,
764        _source: String,
765        _target: String,
766        _criteria: SelectionCriteria,
767    ) -> Result<(), OxirsError> {
768        let _backends = self.backends.clone();
769        let migration_state = self.migration_state.clone();
770        let _config = self.config.clone();
771
772        tokio::spawn(async move {
773            // Migration logic would go here
774            // This is a simplified placeholder
775            let mut state = migration_state.write().await;
776            if let Some(job) = state.active_migrations.get_mut(&job_id) {
777                job.status = MigrationStatus::Running;
778            }
779
780            // Simulate migration
781            tokio::time::sleep(std::time::Duration::from_secs(5)).await;
782
783            // Update status
784            let mut state = migration_state.write().await;
785            if let Some(job) = state.active_migrations.get_mut(&job_id) {
786                job.status = MigrationStatus::Completed;
787                job.progress.migrated_triples = 1000; // Simulated
788            }
789        });
790
791        Ok(())
792    }
793}
794
795#[async_trait]
796impl StorageEngine for VirtualStorage {
797    async fn init(&mut self, _config: super::StorageConfig) -> Result<(), OxirsError> {
798        self.initialize_backends().await
799    }
800
801    async fn store_triple(&self, triple: &Triple) -> Result<(), OxirsError> {
802        let start = std::time::Instant::now();
803
804        // Route write
805        let target_backends = self.route_write().await?;
806        let backends = self.backends.read().await;
807
808        let mut errors = Vec::new();
809        for backend_name in &target_backends {
810            if let Some(backend) = backends.get(backend_name) {
811                if let Err(e) = backend.store_triple(triple).await {
812                    errors.push((backend_name.clone(), e));
813                }
814            }
815        }
816
817        // Update cache
818        if self.config.caching {
819            let mut cache = self.cache.write().await;
820            let hash = self.hash_triple(triple);
821            cache.triple_cache.put(hash, triple.clone());
822        }
823
824        // Update statistics
825        let elapsed = start.elapsed();
826        let mut stats = self.stats.write().await;
827        stats.total_operations += 1;
828        stats.performance.avg_write_latency_ms = (stats.performance.avg_write_latency_ms
829            * (stats.total_operations - 1) as f64
830            + elapsed.as_millis() as f64)
831            / stats.total_operations as f64;
832
833        // Handle errors based on write strategy
834        match self.config.routing.write_strategy {
835            WriteStrategy::All if !errors.is_empty() => {
836                return Err(OxirsError::Store(format!(
837                    "Failed to write to backends: {errors:?}"
838                )));
839            }
840            WriteStrategy::Quorum { n } if target_backends.len() - errors.len() < n => {
841                return Err(OxirsError::Store(format!(
842                    "Quorum write failed: {} successes, needed {}",
843                    target_backends.len() - errors.len(),
844                    n
845                )));
846            }
847            _ => {}
848        }
849
850        Ok(())
851    }
852
853    async fn store_triples(&self, triples: &[Triple]) -> Result<(), OxirsError> {
854        // Batch operation - could be optimized
855        for triple in triples {
856            self.store_triple(triple).await?;
857        }
858        Ok(())
859    }
860
861    async fn query_triples(&self, pattern: &TriplePattern) -> Result<Vec<Triple>, OxirsError> {
862        let start = std::time::Instant::now();
863
864        // Check cache first
865        if self.config.caching {
866            let pattern_key = format!("{pattern:?}");
867            let mut cache = self.cache.write().await;
868            if let Some(cached) = cache.query_cache.get(&pattern_key).cloned() {
869                cache.stats.hits += 1;
870                return Ok(cached);
871            }
872            cache.stats.misses += 1;
873        }
874
875        // Route read
876        let target_backends = self.route_read().await?;
877        let backends = self.backends.read().await;
878
879        let mut all_results = Vec::new();
880        let mut seen = std::collections::HashSet::new();
881
882        for backend_name in &target_backends {
883            if let Some(backend) = backends.get(backend_name) {
884                match backend.query_triples(pattern).await {
885                    Ok(results) => {
886                        for triple in results {
887                            let hash = self.hash_triple(&triple);
888                            if seen.insert(hash) {
889                                all_results.push(triple);
890                            }
891                        }
892                    }
893                    Err(e) => {
894                        tracing::warn!("Query failed on backend {}: {}", backend_name, e);
895                    }
896                }
897            }
898        }
899
900        // Update cache
901        if self.config.caching && !all_results.is_empty() {
902            let pattern_key = format!("{pattern:?}");
903            let mut cache = self.cache.write().await;
904            cache.query_cache.put(pattern_key, all_results.clone());
905        }
906
907        // Update statistics
908        let elapsed = start.elapsed();
909        let mut stats = self.stats.write().await;
910        stats.total_operations += 1;
911        stats.performance.avg_read_latency_ms = (stats.performance.avg_read_latency_ms
912            * (stats.total_operations - 1) as f64
913            + elapsed.as_millis() as f64)
914            / stats.total_operations as f64;
915
916        Ok(all_results)
917    }
918
919    async fn delete_triples(&self, pattern: &TriplePattern) -> Result<usize, OxirsError> {
920        // Route to all writable backends
921        let target_backends = self.route_write().await?;
922        let backends = self.backends.read().await;
923
924        let mut total_deleted = 0;
925        for backend_name in &target_backends {
926            if let Some(backend) = backends.get(backend_name) {
927                match backend.delete_triples(pattern).await {
928                    Ok(count) => total_deleted = total_deleted.max(count),
929                    Err(e) => {
930                        tracing::warn!("Delete failed on backend {}: {}", backend_name, e);
931                    }
932                }
933            }
934        }
935
936        // Invalidate cache
937        if self.config.caching {
938            let mut cache = self.cache.write().await;
939            cache.query_cache.clear();
940            // Could be more selective about cache invalidation
941        }
942
943        Ok(total_deleted)
944    }
945
946    async fn stats(&self) -> Result<super::StorageStats, OxirsError> {
947        let backends = self.backends.read().await;
948        let stats = self.stats.read().await;
949
950        // Aggregate stats from all backends
951        let mut total_triples = 0u64;
952        let mut total_size = 0u64;
953
954        for (_, backend) in backends.iter() {
955            if let Ok(backend_stats) = backend.stats().await {
956                total_triples += backend_stats.total_triples;
957                total_size += backend_stats.total_size_bytes;
958            }
959        }
960
961        Ok(super::StorageStats {
962            total_triples,
963            total_size_bytes: total_size,
964            tier_stats: super::TierStats {
965                hot: super::TierStat {
966                    triple_count: 0,
967                    size_bytes: 0,
968                    hit_rate: 0.0,
969                    avg_access_time_us: 0,
970                },
971                warm: super::TierStat {
972                    triple_count: 0,
973                    size_bytes: 0,
974                    hit_rate: 0.0,
975                    avg_access_time_us: 0,
976                },
977                cold: super::TierStat {
978                    triple_count: 0,
979                    size_bytes: 0,
980                    hit_rate: 0.0,
981                    avg_access_time_us: 0,
982                },
983                archive: super::TierStat {
984                    triple_count: 0,
985                    size_bytes: 0,
986                    hit_rate: 0.0,
987                    avg_access_time_us: 0,
988                },
989            },
990            compression_ratio: 1.0,
991            query_metrics: super::QueryMetrics {
992                avg_query_time_ms: stats.performance.avg_read_latency_ms,
993                p99_query_time_ms: stats.performance.avg_read_latency_ms * 2.0, // Estimate
994                qps: stats.performance.query_throughput_qps,
995                cache_hit_rate: if self.config.caching {
996                    let cache = self.cache.read().await;
997                    let total = cache.stats.hits + cache.stats.misses;
998                    if total > 0 {
999                        (cache.stats.hits as f64 / total as f64) * 100.0
1000                    } else {
1001                        0.0
1002                    }
1003                } else {
1004                    0.0
1005                },
1006            },
1007        })
1008    }
1009
1010    async fn optimize(&self) -> Result<(), OxirsError> {
1011        let backends = self.backends.read().await;
1012
1013        // Optimize all backends
1014        for (name, backend) in backends.iter() {
1015            if let Err(e) = backend.optimize().await {
1016                tracing::warn!("Optimization failed on backend {}: {}", name, e);
1017            }
1018        }
1019
1020        // Clear cache to force refresh
1021        if self.config.caching {
1022            let mut cache = self.cache.write().await;
1023            cache.query_cache.clear();
1024        }
1025
1026        Ok(())
1027    }
1028
1029    async fn backup(&self, path: &Path) -> Result<(), OxirsError> {
1030        std::fs::create_dir_all(path)?;
1031
1032        let backends = self.backends.read().await;
1033
1034        // Backup each backend
1035        for (name, backend) in backends.iter() {
1036            let backend_path = path.join(name);
1037            backend.backup(&backend_path).await?;
1038        }
1039
1040        // Save virtual storage metadata
1041        let metadata = VirtualStorageMetadata {
1042            config: self.config.clone(),
1043            migration_history: {
1044                let state = self.migration_state.read().await;
1045                state.history.clone()
1046            },
1047        };
1048
1049        let metadata_path = path.join("virtual_metadata.json");
1050        let metadata_json = serde_json::to_string_pretty(&metadata)
1051            .map_err(|e| OxirsError::Serialize(e.to_string()))?;
1052        std::fs::write(metadata_path, metadata_json)?;
1053
1054        Ok(())
1055    }
1056
1057    async fn restore(&self, path: &Path) -> Result<(), OxirsError> {
1058        // Load metadata
1059        let metadata_path = path.join("virtual_metadata.json");
1060        let metadata_json = std::fs::read_to_string(metadata_path)?;
1061        let metadata: VirtualStorageMetadata =
1062            serde_json::from_str(&metadata_json).map_err(|e| OxirsError::Parse(e.to_string()))?;
1063
1064        let backends = self.backends.read().await;
1065
1066        // Restore each backend
1067        for (name, backend) in backends.iter() {
1068            let backend_path = path.join(name);
1069            if backend_path.exists() {
1070                backend.restore(&backend_path).await?;
1071            }
1072        }
1073
1074        // Restore migration history
1075        let mut state = self.migration_state.write().await;
1076        state.history = metadata.migration_history;
1077
1078        Ok(())
1079    }
1080}
1081
1082/// Virtual storage metadata
1083#[derive(Debug, Serialize, Deserialize)]
1084struct VirtualStorageMetadata {
1085    config: VirtualStorageConfig,
1086    migration_history: Vec<MigrationRecord>,
1087}
1088
1089impl VirtualStorage {
1090    /// Hash a triple for deduplication
1091    fn hash_triple(&self, triple: &Triple) -> u64 {
1092        use std::hash::{Hash, Hasher};
1093        let mut hasher = std::collections::hash_map::DefaultHasher::new();
1094        triple.hash(&mut hasher);
1095        hasher.finish()
1096    }
1097}
1098
1099#[cfg(test)]
1100mod tests {
1101    use super::*;
1102    use crate::model::{Literal, NamedNode};
1103
1104    #[tokio::test]
1105    #[ignore = "Virtual storage backends not yet implemented - all backend types are disabled"]
1106    async fn test_virtual_storage() {
1107        let test_dir = format!(
1108            "/tmp/oxirs_virtual_test_{}",
1109            std::time::SystemTime::now()
1110                .duration_since(std::time::UNIX_EPOCH)
1111                .expect("operation should succeed")
1112                .as_millis()
1113        );
1114
1115        let mut config = VirtualStorageConfig {
1116            path: PathBuf::from(&test_dir),
1117            ..Default::default()
1118        };
1119
1120        // Update backend config with test-specific paths
1121        if let Some(backend) = config.backends.get_mut(0) {
1122            backend.config = serde_json::json!({
1123                "enable_tiering": true,
1124                "enable_columnar": false,
1125                "enable_temporal": false,
1126                "compression": {
1127                    "Zstd": { "level": 3 }
1128                },
1129                "tiers": {
1130                    "hot_tier": {
1131                        "max_size_mb": 1024,
1132                        "eviction_policy": "Lru",
1133                        "ttl_seconds": 3600
1134                    },
1135                    "warm_tier": {
1136                        "path": format!("{test_dir}/warm"),
1137                        "max_size_gb": 10,
1138                        "promotion_threshold": 10,
1139                        "demotion_threshold_days": 7
1140                    },
1141                    "cold_tier": {
1142                        "path": format!("{test_dir}/cold"),
1143                        "max_size_tb": 1,
1144                        "compression_level": 9,
1145                        "archive_threshold_days": 90
1146                    },
1147                    "archive_tier": {
1148                        "backend": { "Local": format!("{test_dir}/archive") },
1149                        "retention_years": 7,
1150                        "immutable": true
1151                    }
1152                },
1153                "cache_size_mb": 512
1154            });
1155        }
1156
1157        let storage = VirtualStorage::new(config)
1158            .await
1159            .expect("async operation should succeed");
1160        storage
1161            .initialize_backends()
1162            .await
1163            .expect("async operation should succeed");
1164
1165        // Create test triple
1166        let triple = Triple::new(
1167            NamedNode::new("http://example.org/s").expect("valid IRI"),
1168            NamedNode::new("http://example.org/p").expect("valid IRI"),
1169            crate::model::Object::Literal(Literal::new("test")),
1170        );
1171
1172        // Store triple
1173        storage
1174            .store_triple(&triple)
1175            .await
1176            .expect("async operation should succeed");
1177
1178        // Query triple
1179        let pattern = TriplePattern::new(None, None, None);
1180        let results = storage
1181            .query_triples(&pattern)
1182            .await
1183            .expect("async operation should succeed");
1184        assert_eq!(results.len(), 1);
1185        assert_eq!(results[0], triple);
1186    }
1187}