use crate::model::{Triple, TriplePattern};
use crate::storage::StorageEngine;
use crate::OxirsError;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::sync::RwLock;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VirtualStorageConfig {
pub path: PathBuf,
pub backends: Vec<BackendConfig>,
pub routing: RoutingPolicy,
pub migration: MigrationPolicy,
pub caching: bool,
pub cache_size_mb: usize,
}
impl Default for VirtualStorageConfig {
fn default() -> Self {
VirtualStorageConfig {
path: PathBuf::from("/var/oxirs/virtual"),
backends: vec![BackendConfig {
name: "primary".to_string(),
backend_type: BackendType::Tiered,
config: serde_json::json!({
"enable_tiering": true,
"enable_columnar": false,
"enable_temporal": false,
"compression": {
"Zstd": { "level": 3 }
},
"tiers": {
"hot_tier": {
"max_size_mb": 1024,
"eviction_policy": "Lru",
"ttl_seconds": 3600
},
"warm_tier": {
"path": "/tmp/oxirs_virtual_warm",
"max_size_gb": 10,
"promotion_threshold": 10,
"demotion_threshold_days": 7
},
"cold_tier": {
"path": "/tmp/oxirs_virtual_cold",
"max_size_tb": 1,
"compression_level": 9,
"archive_threshold_days": 90
},
"archive_tier": {
"backend": { "Local": "/tmp/oxirs_virtual_archive" },
"retention_years": 7,
"immutable": true
}
},
"cache_size_mb": 512
}),
weight: 1.0,
read_only: false,
}],
routing: RoutingPolicy::default(),
migration: MigrationPolicy::default(),
caching: true,
cache_size_mb: 1024,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BackendConfig {
pub name: String,
pub backend_type: BackendType,
pub config: serde_json::Value,
pub weight: f64,
pub read_only: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum BackendType {
Tiered,
Columnar,
Immutable,
Temporal,
Remote { endpoint: String },
Cloud { provider: CloudProvider },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum CloudProvider {
AWS { bucket: String, region: String },
GCP { bucket: String, project: String },
Azure { container: String, account: String },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RoutingPolicy {
pub read_strategy: ReadStrategy,
pub write_strategy: WriteStrategy,
pub query_hints: HashMap<String, String>,
pub predicate_rules: HashMap<String, String>,
}
impl Default for RoutingPolicy {
fn default() -> Self {
RoutingPolicy {
read_strategy: ReadStrategy::FirstAvailable,
write_strategy: WriteStrategy::All,
query_hints: HashMap::new(),
predicate_rules: HashMap::new(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ReadStrategy {
FirstAvailable,
RoundRobin,
WeightedRandom,
Broadcast,
PatternBased,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum WriteStrategy {
All,
PrimaryOnly,
Quorum { n: usize },
Selective,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MigrationPolicy {
pub auto_migrate: bool,
pub trigger: MigrationTrigger,
pub batch_size: usize,
pub rate_limit: Option<usize>,
pub rules: Vec<MigrationRule>,
}
impl Default for MigrationPolicy {
fn default() -> Self {
MigrationPolicy {
auto_migrate: false,
trigger: MigrationTrigger::Manual,
batch_size: 10000,
rate_limit: Some(100000),
rules: Vec::new(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum MigrationTrigger {
Manual,
StorageThreshold(f64),
Periodic(u32),
CostOptimization,
Performance,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MigrationRule {
pub name: String,
pub source: String,
pub target: String,
pub criteria: SelectionCriteria,
pub priority: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum SelectionCriteria {
Age(u32),
AccessFrequency { threshold: u32 },
Size(usize),
PredicatePattern(String),
Custom(String),
}
pub struct VirtualStorage {
config: VirtualStorageConfig,
backends: Arc<RwLock<HashMap<String, Arc<dyn StorageEngine>>>>,
routing_state: Arc<RwLock<RoutingState>>,
migration_state: Arc<RwLock<MigrationState>>,
cache: Arc<RwLock<StorageCache>>,
stats: Arc<RwLock<VirtualStorageStats>>,
}
struct RoutingState {
round_robin_counter: usize,
backend_health: HashMap<String, BackendHealth>,
#[allow(dead_code)]
active_migrations: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct BackendHealth {
healthy: bool,
last_check: u64,
failure_count: u32,
avg_response_time_ms: f64,
}
struct MigrationState {
active_migrations: HashMap<String, MigrationJob>,
history: Vec<MigrationRecord>,
#[allow(dead_code)]
coordinator: Option<MigrationCoordinator>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MigrationJob {
id: String,
source: String,
target: String,
progress: MigrationProgress,
start_time: u64,
status: MigrationStatus,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct MigrationProgress {
total_triples: u64,
migrated_triples: u64,
failed_triples: u64,
current_batch: u64,
eta: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
enum MigrationStatus {
Pending,
Running,
Paused,
Completed,
Failed(String),
Cancelled,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct MigrationRecord {
job_id: String,
source: String,
target: String,
start_time: chrono::DateTime<chrono::Utc>,
end_time: chrono::DateTime<chrono::Utc>,
triples_migrated: u64,
status: MigrationStatus,
}
struct MigrationCoordinator {
#[allow(dead_code)]
workers: Vec<tokio::task::JoinHandle<()>>,
#[allow(dead_code)]
control_tx: tokio::sync::mpsc::Sender<MigrationControl>,
}
#[derive(Debug)]
enum MigrationControl {
#[allow(dead_code)]
Pause,
#[allow(dead_code)]
Resume,
#[allow(dead_code)]
Cancel,
#[allow(dead_code)]
UpdateRateLimit(usize),
}
struct StorageCache {
triple_cache: lru::LruCache<u64, Triple>,
query_cache: lru::LruCache<String, Vec<Triple>>,
stats: CacheStats,
}
#[derive(Debug, Default)]
struct CacheStats {
hits: u64,
misses: u64,
#[allow(dead_code)]
evictions: u64,
}
#[derive(Debug, Default)]
struct VirtualStorageStats {
total_operations: u64,
#[allow(dead_code)]
backend_operations: HashMap<String, u64>,
#[allow(dead_code)]
migration_stats: MigrationStats,
performance: PerformanceMetrics,
}
#[derive(Debug, Default)]
struct MigrationStats {
#[allow(dead_code)]
total_migrations: u64,
#[allow(dead_code)]
successful_migrations: u64,
#[allow(dead_code)]
failed_migrations: u64,
#[allow(dead_code)]
total_triples_migrated: u64,
#[allow(dead_code)]
total_migration_time_sec: u64,
}
#[derive(Debug, Default)]
struct PerformanceMetrics {
avg_read_latency_ms: f64,
avg_write_latency_ms: f64,
query_throughput_qps: f64,
#[allow(dead_code)]
write_throughput_tps: f64,
}
impl VirtualStorage {
pub async fn new(config: VirtualStorageConfig) -> Result<Self, OxirsError> {
std::fs::create_dir_all(&config.path)?;
let cache_size = (config.cache_size_mb * 1024 * 1024) / 1000;
Ok(VirtualStorage {
config: config.clone(),
backends: Arc::new(RwLock::new(HashMap::new())),
routing_state: Arc::new(RwLock::new(RoutingState {
round_robin_counter: 0,
backend_health: HashMap::new(),
active_migrations: Vec::new(),
})),
migration_state: Arc::new(RwLock::new(MigrationState {
active_migrations: HashMap::new(),
history: Vec::new(),
coordinator: None,
})),
cache: Arc::new(RwLock::new(StorageCache {
triple_cache: lru::LruCache::new(
std::num::NonZeroUsize::new(cache_size).unwrap_or(
std::num::NonZeroUsize::new(10000).expect("constant is non-zero"),
),
),
query_cache: lru::LruCache::new(
std::num::NonZeroUsize::new(1000).expect("constant is non-zero"),
),
stats: CacheStats::default(),
})),
stats: Arc::new(RwLock::new(VirtualStorageStats::default())),
})
}
pub async fn initialize_backends(&self) -> Result<(), OxirsError> {
for backend_config in &self.config.backends {
let backend = self.create_backend(backend_config).await?;
let mut backends = self.backends.write().await;
backends.insert(backend_config.name.clone(), backend);
let mut routing = self.routing_state.write().await;
routing.backend_health.insert(
backend_config.name.clone(),
BackendHealth {
healthy: true,
last_check: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("SystemTime should be after UNIX_EPOCH")
.as_secs(),
failure_count: 0,
avg_response_time_ms: 0.0,
},
);
}
self.start_health_monitoring();
Ok(())
}
pub async fn start_migration(
&self,
source: &str,
target: &str,
criteria: SelectionCriteria,
) -> Result<String, OxirsError> {
let job_id = uuid::Uuid::new_v4().to_string();
let job = MigrationJob {
id: job_id.clone(),
source: source.to_string(),
target: target.to_string(),
progress: MigrationProgress {
total_triples: 0,
migrated_triples: 0,
failed_triples: 0,
current_batch: 0,
eta: None,
},
start_time: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("SystemTime should be after UNIX_EPOCH")
.as_secs(),
status: MigrationStatus::Pending,
};
let mut migration_state = self.migration_state.write().await;
migration_state
.active_migrations
.insert(job_id.clone(), job);
self.spawn_migration_worker(
job_id.clone(),
source.to_string(),
target.to_string(),
criteria,
)
.await?;
Ok(job_id)
}
pub async fn get_migration_status(&self, job_id: &str) -> Result<MigrationJob, OxirsError> {
let migration_state = self.migration_state.read().await;
migration_state
.active_migrations
.get(job_id)
.cloned()
.ok_or_else(|| OxirsError::Store(format!("Migration job not found: {job_id}")))
}
async fn create_backend(
&self,
config: &BackendConfig,
) -> Result<Arc<dyn StorageEngine>, OxirsError> {
match &config.backend_type {
BackendType::Tiered => {
Err(OxirsError::Store(
"Tiered backend temporarily disabled due to RocksDB dependency conflicts"
.to_string(),
))
}
BackendType::Columnar => {
Err(OxirsError::Store(
"Columnar backend not yet integrated".to_string(),
))
}
BackendType::Immutable => {
Err(OxirsError::Store(
"Immutable backend not yet integrated".to_string(),
))
}
BackendType::Temporal => {
Err(OxirsError::Store(
"Temporal backend not yet integrated".to_string(),
))
}
BackendType::Remote { endpoint } => {
Err(OxirsError::Store(format!(
"Remote backend not implemented: {endpoint}"
)))
}
BackendType::Cloud { provider: _ } => {
Err(OxirsError::Store(
"Cloud backend not implemented".to_string(),
))
}
}
}
async fn route_read(&self) -> Result<Vec<String>, OxirsError> {
let routing_state = self.routing_state.read().await;
let backends = self.backends.read().await;
match self.config.routing.read_strategy {
ReadStrategy::FirstAvailable => {
for (name, health) in &routing_state.backend_health {
if health.healthy && backends.contains_key(name) {
return Ok(vec![name.clone()]);
}
}
Err(OxirsError::Store(
"No healthy backends available".to_string(),
))
}
ReadStrategy::RoundRobin => {
let healthy_backends: Vec<_> = routing_state
.backend_health
.iter()
.filter(|(name, health)| health.healthy && backends.contains_key(*name))
.map(|(name, _)| name.clone())
.collect();
if healthy_backends.is_empty() {
return Err(OxirsError::Store(
"No healthy backends available".to_string(),
));
}
let index = routing_state.round_robin_counter % healthy_backends.len();
Ok(vec![healthy_backends[index].clone()])
}
ReadStrategy::Broadcast => {
Ok(routing_state
.backend_health
.iter()
.filter(|(name, health)| health.healthy && backends.contains_key(*name))
.map(|(name, _)| name.clone())
.collect())
}
_ => Ok(vec!["primary".to_string()]), }
}
async fn route_write(&self) -> Result<Vec<String>, OxirsError> {
let routing_state = self.routing_state.read().await;
let backends = self.backends.read().await;
match self.config.routing.write_strategy {
WriteStrategy::All => {
Ok(self
.config
.backends
.iter()
.filter(|b| !b.read_only)
.filter(|b| {
routing_state
.backend_health
.get(&b.name)
.map(|h| h.healthy)
.unwrap_or(false)
})
.filter(|b| backends.contains_key(&b.name))
.map(|b| b.name.clone())
.collect())
}
WriteStrategy::PrimaryOnly => Ok(vec!["primary".to_string()]),
WriteStrategy::Quorum { n } => {
let writable: Vec<_> = self
.config
.backends
.iter()
.filter(|b| !b.read_only)
.filter(|b| {
routing_state
.backend_health
.get(&b.name)
.map(|h| h.healthy)
.unwrap_or(false)
})
.filter(|b| backends.contains_key(&b.name))
.take(n)
.map(|b| b.name.clone())
.collect();
if writable.len() < n {
return Err(OxirsError::Store(format!(
"Not enough healthy backends for quorum: need {}, have {}",
n,
writable.len()
)));
}
Ok(writable)
}
WriteStrategy::Selective => {
Ok(vec!["primary".to_string()]) }
}
}
fn start_health_monitoring(&self) {
let backends = self.backends.clone();
let routing_state = self.routing_state.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
loop {
interval.tick().await;
let backend_list = backends.read().await;
for (name, backend) in backend_list.iter() {
let start = std::time::Instant::now();
let health_check = backend.stats().await;
let elapsed = start.elapsed();
let mut routing = routing_state.write().await;
if let Some(health) = routing.backend_health.get_mut(name) {
health.last_check = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("SystemTime should be after UNIX_EPOCH")
.as_secs();
health.avg_response_time_ms = elapsed.as_millis() as f64;
if health_check.is_ok() {
health.healthy = true;
health.failure_count = 0;
} else {
health.failure_count += 1;
if health.failure_count >= 3 {
health.healthy = false;
}
}
}
}
}
});
}
async fn spawn_migration_worker(
&self,
job_id: String,
_source: String,
_target: String,
_criteria: SelectionCriteria,
) -> Result<(), OxirsError> {
let _backends = self.backends.clone();
let migration_state = self.migration_state.clone();
let _config = self.config.clone();
tokio::spawn(async move {
let mut state = migration_state.write().await;
if let Some(job) = state.active_migrations.get_mut(&job_id) {
job.status = MigrationStatus::Running;
}
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
let mut state = migration_state.write().await;
if let Some(job) = state.active_migrations.get_mut(&job_id) {
job.status = MigrationStatus::Completed;
job.progress.migrated_triples = 1000; }
});
Ok(())
}
}
#[async_trait]
impl StorageEngine for VirtualStorage {
async fn init(&mut self, _config: super::StorageConfig) -> Result<(), OxirsError> {
self.initialize_backends().await
}
async fn store_triple(&self, triple: &Triple) -> Result<(), OxirsError> {
let start = std::time::Instant::now();
let target_backends = self.route_write().await?;
let backends = self.backends.read().await;
let mut errors = Vec::new();
for backend_name in &target_backends {
if let Some(backend) = backends.get(backend_name) {
if let Err(e) = backend.store_triple(triple).await {
errors.push((backend_name.clone(), e));
}
}
}
if self.config.caching {
let mut cache = self.cache.write().await;
let hash = self.hash_triple(triple);
cache.triple_cache.put(hash, triple.clone());
}
let elapsed = start.elapsed();
let mut stats = self.stats.write().await;
stats.total_operations += 1;
stats.performance.avg_write_latency_ms = (stats.performance.avg_write_latency_ms
* (stats.total_operations - 1) as f64
+ elapsed.as_millis() as f64)
/ stats.total_operations as f64;
match self.config.routing.write_strategy {
WriteStrategy::All => {
if !errors.is_empty() {
return Err(OxirsError::Store(format!(
"Failed to write to backends: {errors:?}"
)));
}
}
WriteStrategy::Quorum { n } => {
if target_backends.len() - errors.len() < n {
return Err(OxirsError::Store(format!(
"Quorum write failed: {} successes, needed {}",
target_backends.len() - errors.len(),
n
)));
}
}
_ => {}
}
Ok(())
}
async fn store_triples(&self, triples: &[Triple]) -> Result<(), OxirsError> {
for triple in triples {
self.store_triple(triple).await?;
}
Ok(())
}
async fn query_triples(&self, pattern: &TriplePattern) -> Result<Vec<Triple>, OxirsError> {
let start = std::time::Instant::now();
if self.config.caching {
let pattern_key = format!("{pattern:?}");
let mut cache = self.cache.write().await;
if let Some(cached) = cache.query_cache.get(&pattern_key).cloned() {
cache.stats.hits += 1;
return Ok(cached);
}
cache.stats.misses += 1;
}
let target_backends = self.route_read().await?;
let backends = self.backends.read().await;
let mut all_results = Vec::new();
let mut seen = std::collections::HashSet::new();
for backend_name in &target_backends {
if let Some(backend) = backends.get(backend_name) {
match backend.query_triples(pattern).await {
Ok(results) => {
for triple in results {
let hash = self.hash_triple(&triple);
if seen.insert(hash) {
all_results.push(triple);
}
}
}
Err(e) => {
tracing::warn!("Query failed on backend {}: {}", backend_name, e);
}
}
}
}
if self.config.caching && !all_results.is_empty() {
let pattern_key = format!("{pattern:?}");
let mut cache = self.cache.write().await;
cache.query_cache.put(pattern_key, all_results.clone());
}
let elapsed = start.elapsed();
let mut stats = self.stats.write().await;
stats.total_operations += 1;
stats.performance.avg_read_latency_ms = (stats.performance.avg_read_latency_ms
* (stats.total_operations - 1) as f64
+ elapsed.as_millis() as f64)
/ stats.total_operations as f64;
Ok(all_results)
}
async fn delete_triples(&self, pattern: &TriplePattern) -> Result<usize, OxirsError> {
let target_backends = self.route_write().await?;
let backends = self.backends.read().await;
let mut total_deleted = 0;
for backend_name in &target_backends {
if let Some(backend) = backends.get(backend_name) {
match backend.delete_triples(pattern).await {
Ok(count) => total_deleted = total_deleted.max(count),
Err(e) => {
tracing::warn!("Delete failed on backend {}: {}", backend_name, e);
}
}
}
}
if self.config.caching {
let mut cache = self.cache.write().await;
cache.query_cache.clear();
}
Ok(total_deleted)
}
async fn stats(&self) -> Result<super::StorageStats, OxirsError> {
let backends = self.backends.read().await;
let stats = self.stats.read().await;
let mut total_triples = 0u64;
let mut total_size = 0u64;
for (_, backend) in backends.iter() {
if let Ok(backend_stats) = backend.stats().await {
total_triples += backend_stats.total_triples;
total_size += backend_stats.total_size_bytes;
}
}
Ok(super::StorageStats {
total_triples,
total_size_bytes: total_size,
tier_stats: super::TierStats {
hot: super::TierStat {
triple_count: 0,
size_bytes: 0,
hit_rate: 0.0,
avg_access_time_us: 0,
},
warm: super::TierStat {
triple_count: 0,
size_bytes: 0,
hit_rate: 0.0,
avg_access_time_us: 0,
},
cold: super::TierStat {
triple_count: 0,
size_bytes: 0,
hit_rate: 0.0,
avg_access_time_us: 0,
},
archive: super::TierStat {
triple_count: 0,
size_bytes: 0,
hit_rate: 0.0,
avg_access_time_us: 0,
},
},
compression_ratio: 1.0,
query_metrics: super::QueryMetrics {
avg_query_time_ms: stats.performance.avg_read_latency_ms,
p99_query_time_ms: stats.performance.avg_read_latency_ms * 2.0, qps: stats.performance.query_throughput_qps,
cache_hit_rate: if self.config.caching {
let cache = self.cache.read().await;
let total = cache.stats.hits + cache.stats.misses;
if total > 0 {
(cache.stats.hits as f64 / total as f64) * 100.0
} else {
0.0
}
} else {
0.0
},
},
})
}
async fn optimize(&self) -> Result<(), OxirsError> {
let backends = self.backends.read().await;
for (name, backend) in backends.iter() {
if let Err(e) = backend.optimize().await {
tracing::warn!("Optimization failed on backend {}: {}", name, e);
}
}
if self.config.caching {
let mut cache = self.cache.write().await;
cache.query_cache.clear();
}
Ok(())
}
async fn backup(&self, path: &Path) -> Result<(), OxirsError> {
std::fs::create_dir_all(path)?;
let backends = self.backends.read().await;
for (name, backend) in backends.iter() {
let backend_path = path.join(name);
backend.backup(&backend_path).await?;
}
let metadata = VirtualStorageMetadata {
config: self.config.clone(),
migration_history: {
let state = self.migration_state.read().await;
state.history.clone()
},
};
let metadata_path = path.join("virtual_metadata.json");
let metadata_json = serde_json::to_string_pretty(&metadata)
.map_err(|e| OxirsError::Serialize(e.to_string()))?;
std::fs::write(metadata_path, metadata_json)?;
Ok(())
}
async fn restore(&self, path: &Path) -> Result<(), OxirsError> {
let metadata_path = path.join("virtual_metadata.json");
let metadata_json = std::fs::read_to_string(metadata_path)?;
let metadata: VirtualStorageMetadata =
serde_json::from_str(&metadata_json).map_err(|e| OxirsError::Parse(e.to_string()))?;
let backends = self.backends.read().await;
for (name, backend) in backends.iter() {
let backend_path = path.join(name);
if backend_path.exists() {
backend.restore(&backend_path).await?;
}
}
let mut state = self.migration_state.write().await;
state.history = metadata.migration_history;
Ok(())
}
}
#[derive(Debug, Serialize, Deserialize)]
struct VirtualStorageMetadata {
config: VirtualStorageConfig,
migration_history: Vec<MigrationRecord>,
}
impl VirtualStorage {
fn hash_triple(&self, triple: &Triple) -> u64 {
use std::hash::{Hash, Hasher};
let mut hasher = std::collections::hash_map::DefaultHasher::new();
triple.hash(&mut hasher);
hasher.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::model::{Literal, NamedNode};
#[tokio::test]
#[ignore = "Virtual storage backends not yet implemented - all backend types are disabled"]
async fn test_virtual_storage() {
let test_dir = format!(
"/tmp/oxirs_virtual_test_{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("operation should succeed")
.as_millis()
);
let mut config = VirtualStorageConfig {
path: PathBuf::from(&test_dir),
..Default::default()
};
if let Some(backend) = config.backends.get_mut(0) {
backend.config = serde_json::json!({
"enable_tiering": true,
"enable_columnar": false,
"enable_temporal": false,
"compression": {
"Zstd": { "level": 3 }
},
"tiers": {
"hot_tier": {
"max_size_mb": 1024,
"eviction_policy": "Lru",
"ttl_seconds": 3600
},
"warm_tier": {
"path": format!("{test_dir}/warm"),
"max_size_gb": 10,
"promotion_threshold": 10,
"demotion_threshold_days": 7
},
"cold_tier": {
"path": format!("{test_dir}/cold"),
"max_size_tb": 1,
"compression_level": 9,
"archive_threshold_days": 90
},
"archive_tier": {
"backend": { "Local": format!("{test_dir}/archive") },
"retention_years": 7,
"immutable": true
}
},
"cache_size_mb": 512
});
}
let storage = VirtualStorage::new(config)
.await
.expect("async operation should succeed");
storage
.initialize_backends()
.await
.expect("async operation should succeed");
let triple = Triple::new(
NamedNode::new("http://example.org/s").expect("valid IRI"),
NamedNode::new("http://example.org/p").expect("valid IRI"),
crate::model::Object::Literal(Literal::new("test")),
);
storage
.store_triple(&triple)
.await
.expect("async operation should succeed");
let pattern = TriplePattern::new(None, None, None);
let results = storage
.query_triples(&pattern)
.await
.expect("async operation should succeed");
assert_eq!(results.len(), 1);
assert_eq!(results[0], triple);
}
}