1#[cfg(feature = "rocksdb")]
9mod tiered_impl {
10 #[allow(unused_imports)] use super::*;
12 use crate::model::{Triple, TriplePattern};
13 use crate::storage::{
14 ArchiveBackend, CompressionType, QueryMetrics, StorageConfig, StorageEngine, StorageStats,
15 TierStat, TierStats,
16 };
17 use crate::OxirsError;
18 use dashmap::DashMap;
19 use lru::LruCache;
20 use parking_lot::Mutex;
21 use serde::{Deserialize, Serialize};
22 use std::collections::HashMap;
23 use std::path::{Path, PathBuf};
24 use std::sync::atomic::{AtomicU64, Ordering};
25 use std::sync::Arc;
26 use std::time::{Duration, SystemTime};
27 use tokio::sync::RwLock;
28
29 #[derive(Debug, Clone, Serialize, Deserialize)]
31 struct AccessInfo {
32 last_access: SystemTime,
34 access_count: u64,
36 tier: StorageTier,
38 size_bytes: usize,
40 }
41
42 #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
44 enum StorageTier {
45 Hot,
46 Warm,
47 Cold,
48 Archive,
49 }
50
51 #[derive(Debug, Clone, Serialize, Deserialize)]
53 struct StoredTriple {
54 triple: Triple,
55 metadata: TripleMetadata,
56 }
57
58 #[derive(Debug, Clone, Serialize, Deserialize)]
60 struct TripleMetadata {
61 created_at: SystemTime,
63 modified_at: SystemTime,
65 access_info: AccessInfo,
67 content_hash: u64,
69 compression: Option<CompressionInfo>,
71 }
72
73 #[derive(Debug, Clone, Serialize, Deserialize)]
75 struct CompressionInfo {
76 original_size: usize,
78 compressed_size: usize,
80 algorithm: String,
82 }
83
84 pub struct TieredStorageEngine {
86 config: StorageConfig,
88 hot_tier: Arc<Mutex<LruCache<u64, StoredTriple>>>,
90 warm_tier: Arc<RwLock<WarmTier>>,
92 cold_tier: Arc<RwLock<ColdTier>>,
94 archive_tier: Arc<RwLock<ArchiveTier>>,
96 index: Arc<DashMap<u64, AccessInfo>>,
98 stats: Arc<Statistics>,
100 background_handle: Option<tokio::task::JoinHandle<()>>,
102 }
103
104 struct WarmTier {
106 _path: PathBuf,
107 storage: rocksdb::DB,
108 access_tracker: HashMap<u64, u64>,
109 }
110
111 struct ColdTier {
113 _path: PathBuf,
114 storage: rocksdb::DB,
115 compression: compression::Compressor,
116 }
117
118 struct ArchiveTier {
120 _backend: ArchiveBackend,
121 _index: HashMap<u64, ArchiveLocation>,
122 }
123
124 #[derive(Debug, Clone, Serialize, Deserialize)]
126 struct ArchiveLocation {
127 file_path: String,
129 offset: u64,
131 size: usize,
133 checksum: u64,
135 }
136
137 struct Statistics {
139 total_triples: AtomicU64,
140 hot_count: AtomicU64,
141 warm_count: AtomicU64,
142 cold_count: AtomicU64,
143 archive_count: AtomicU64,
144 total_size: AtomicU64,
145 hot_hits: AtomicU64,
146 warm_hits: AtomicU64,
147 cold_hits: AtomicU64,
148 total_queries: AtomicU64,
149 }
150
151 impl TieredStorageEngine {
152 pub async fn create(config: StorageConfig) -> Result<Arc<dyn StorageEngine>, OxirsError> {
154 let hot_capacity = config.tiers.hot_tier.max_size_mb * 1024 * 1024 / 1000; let hot_tier = Arc::new(Mutex::new(LruCache::new(
157 std::num::NonZeroUsize::new(hot_capacity)
158 .unwrap_or(std::num::NonZeroUsize::new(10000).expect("constant is non-zero")),
159 )));
160
161 let warm_path = PathBuf::from(&config.tiers.warm_tier.path);
163 std::fs::create_dir_all(&warm_path)?;
164 let mut warm_opts = rocksdb::Options::default();
165 warm_opts.create_if_missing(true);
166 let warm_storage = rocksdb::DB::open(&warm_opts, warm_path.join("data"))?;
167 let warm_tier = Arc::new(RwLock::new(WarmTier {
168 _path: warm_path,
169 storage: warm_storage,
170 access_tracker: HashMap::new(),
171 }));
172
173 let cold_path = PathBuf::from(&config.tiers.cold_tier.path);
175 std::fs::create_dir_all(&cold_path)?;
176 let mut cold_opts = rocksdb::Options::default();
177 cold_opts.create_if_missing(true);
178 let cold_storage = rocksdb::DB::open(&cold_opts, cold_path.join("data"))?;
179 let cold_tier = Arc::new(RwLock::new(ColdTier {
180 _path: cold_path,
181 storage: cold_storage,
182 compression: compression::Compressor::new(config.compression.clone()),
183 }));
184
185 let archive_tier = Arc::new(RwLock::new(ArchiveTier {
187 _backend: config.tiers.archive_tier.backend.clone(),
188 _index: HashMap::new(),
189 }));
190
191 let index = Arc::new(DashMap::new());
193 let stats = Arc::new(Statistics {
194 total_triples: AtomicU64::new(0),
195 hot_count: AtomicU64::new(0),
196 warm_count: AtomicU64::new(0),
197 cold_count: AtomicU64::new(0),
198 archive_count: AtomicU64::new(0),
199 total_size: AtomicU64::new(0),
200 hot_hits: AtomicU64::new(0),
201 warm_hits: AtomicU64::new(0),
202 cold_hits: AtomicU64::new(0),
203 total_queries: AtomicU64::new(0),
204 });
205
206 let mut engine = TieredStorageEngine {
207 config,
208 hot_tier,
209 warm_tier,
210 cold_tier,
211 archive_tier,
212 index,
213 stats,
214 background_handle: None,
215 };
216
217 engine.start_background_tasks();
219
220 Ok(Arc::new(engine))
221 }
222
223 fn start_background_tasks(&mut self) {
225 let hot_tier = self.hot_tier.clone();
226 let warm_tier = self.warm_tier.clone();
227 let cold_tier = self.cold_tier.clone();
228 let archive_tier = self.archive_tier.clone();
229 let index = self.index.clone();
230 let config = self.config.clone();
231
232 let handle = tokio::spawn(async move {
233 let mut interval = tokio::time::interval(Duration::from_secs(60));
234
235 loop {
236 interval.tick().await;
237
238 if let Err(e) = Self::manage_tiers(
240 &hot_tier,
241 &warm_tier,
242 &cold_tier,
243 &archive_tier,
244 &index,
245 &config,
246 )
247 .await
248 {
249 tracing::error!("Tier management error: {}", e);
250 }
251 }
252 });
253
254 self.background_handle = Some(handle);
255 }
256
257 async fn manage_tiers(
259 hot_tier: &Arc<Mutex<LruCache<u64, StoredTriple>>>,
260 warm_tier: &Arc<RwLock<WarmTier>>,
261 cold_tier: &Arc<RwLock<ColdTier>>,
262 archive_tier: &Arc<RwLock<ArchiveTier>>,
263 index: &Arc<DashMap<u64, AccessInfo>>,
264 config: &StorageConfig,
265 ) -> Result<(), OxirsError> {
266 let now = SystemTime::now();
267
268 {
270 let mut warm = warm_tier.write().await;
271 let mut to_promote = Vec::new();
272 let mut to_demote = Vec::new();
273
274 for (hash, access_count) in &warm.access_tracker {
275 if *access_count >= config.tiers.warm_tier.promotion_threshold as u64 {
276 to_promote.push(*hash);
277 } else if let Some(info) = index.get(hash) {
278 let days_since_access = now
279 .duration_since(info.last_access)
280 .unwrap_or(Duration::ZERO)
281 .as_secs()
282 / 86400;
283
284 if days_since_access
285 >= config.tiers.warm_tier.demotion_threshold_days as u64
286 {
287 to_demote.push(*hash);
288 }
289 }
290 }
291
292 for hash in to_promote {
294 if let Ok(Some(data)) = warm.storage.get(hash.to_be_bytes()) {
295 if let Ok((triple, _)) = oxicode::serde::decode_from_slice::<StoredTriple, _>(
296 &data,
297 oxicode::config::standard(),
298 ) {
299 hot_tier.lock().put(hash, triple);
300 warm.storage.delete(hash.to_be_bytes())?;
301 warm.access_tracker.remove(&hash);
302
303 if let Some(mut info) = index.get_mut(&hash) {
304 info.tier = StorageTier::Hot;
305 }
306 }
307 }
308 }
309
310 let cold = cold_tier.write().await;
312 for hash in to_demote {
313 if let Ok(Some(data)) = warm.storage.get(hash.to_be_bytes()) {
314 let compressed = cold.compression.compress(&data)?;
316 cold.storage.put(hash.to_be_bytes(), compressed)?;
317 warm.storage.delete(hash.to_be_bytes())?;
318 warm.access_tracker.remove(&hash);
319
320 if let Some(mut info) = index.get_mut(&hash) {
321 info.tier = StorageTier::Cold;
322 }
323 }
324 }
325 }
326
327 {
329 let _cold = cold_tier.read().await;
330 let mut to_archive = Vec::new();
331
332 for entry in index.iter() {
333 let (hash, info) = entry.pair();
334 if info.tier == StorageTier::Cold {
335 let days_since_access = now
336 .duration_since(info.last_access)
337 .unwrap_or(Duration::ZERO)
338 .as_secs()
339 / 86400;
340
341 if days_since_access >= config.tiers.cold_tier.archive_threshold_days as u64
342 {
343 to_archive.push(*hash);
344 }
345 }
346 }
347
348 if !to_archive.is_empty() {
350 let _archive = archive_tier.write().await;
351 }
354 }
355
356 Ok(())
357 }
358
359 fn hash_triple(triple: &Triple) -> u64 {
361 use std::hash::{Hash, Hasher};
362 let mut hasher = std::collections::hash_map::DefaultHasher::new();
363 triple.hash(&mut hasher);
364 hasher.finish()
365 }
366
367 fn determine_initial_tier(&self, _triple: &Triple) -> StorageTier {
369 StorageTier::Warm
373 }
374 }
375
376 #[async_trait::async_trait]
377 impl StorageEngine for TieredStorageEngine {
378 async fn init(&mut self, config: StorageConfig) -> Result<(), OxirsError> {
379 self.config = config;
380 Ok(())
381 }
382
383 async fn store_triple(&self, triple: &Triple) -> Result<(), OxirsError> {
384 let hash = Self::hash_triple(triple);
385 let now = SystemTime::now();
386
387 if self.index.contains_key(&hash) {
389 return Ok(());
390 }
391
392 let temp_serialized =
395 oxicode::serde::encode_to_vec(triple, oxicode::config::standard())?;
396 let size_bytes = temp_serialized.len();
397
398 let stored = StoredTriple {
399 triple: triple.clone(),
400 metadata: TripleMetadata {
401 created_at: now,
402 modified_at: now,
403 access_info: AccessInfo {
404 last_access: now,
405 access_count: 0,
406 tier: self.determine_initial_tier(triple),
407 size_bytes,
408 },
409 content_hash: hash,
410 compression: None,
411 },
412 };
413
414 match stored.metadata.access_info.tier {
416 StorageTier::Hot => {
417 self.hot_tier.lock().put(hash, stored.clone());
418 self.stats.hot_count.fetch_add(1, Ordering::Relaxed);
419 }
420 StorageTier::Warm => {
421 let data = oxicode::serde::encode_to_vec(&stored, oxicode::config::standard())?;
422 self.warm_tier
423 .write()
424 .await
425 .storage
426 .put(hash.to_be_bytes(), data)?;
427 self.stats.warm_count.fetch_add(1, Ordering::Relaxed);
428 }
429 _ => unreachable!("New triples should not go directly to cold/archive"),
430 }
431
432 self.index.insert(hash, stored.metadata.access_info.clone());
434 self.stats.total_triples.fetch_add(1, Ordering::Relaxed);
435 self.stats.total_size.fetch_add(
436 stored.metadata.access_info.size_bytes as u64,
437 Ordering::Relaxed,
438 );
439
440 Ok(())
441 }
442
443 async fn store_triples(&self, triples: &[Triple]) -> Result<(), OxirsError> {
444 #[cfg(feature = "parallel")]
446 {
447 use rayon::prelude::*;
448 triples
449 .par_iter()
450 .try_for_each(|triple| futures::executor::block_on(self.store_triple(triple)))
451 }
452 #[cfg(not(feature = "parallel"))]
453 {
454 for triple in triples {
455 self.store_triple(triple).await?;
456 }
457 Ok(())
458 }
459 }
460
461 async fn query_triples(&self, pattern: &TriplePattern) -> Result<Vec<Triple>, OxirsError> {
462 self.stats.total_queries.fetch_add(1, Ordering::Relaxed);
463 let mut results = Vec::new();
464
465 {
467 let hot = self.hot_tier.lock();
468 for (_, stored) in hot.iter() {
469 if pattern.matches(&stored.triple) {
470 results.push(stored.triple.clone());
471 self.stats.hot_hits.fetch_add(1, Ordering::Relaxed);
472 }
473 }
474 }
475
476 if results.is_empty() {
478 let warm = self.warm_tier.read().await;
479 let iter = warm.storage.iterator(rocksdb::IteratorMode::Start);
481 for (_key, value) in iter.flatten() {
482 if let Ok((stored, _)) = oxicode::serde::decode_from_slice::<StoredTriple, _>(
483 &value,
484 oxicode::config::standard(),
485 ) {
486 if pattern.matches(&stored.triple) {
487 results.push(stored.triple.clone());
488 self.stats.warm_hits.fetch_add(1, Ordering::Relaxed);
489 }
490 }
491 }
492 }
493
494 let now = SystemTime::now();
496 for triple in &results {
497 let hash = Self::hash_triple(triple);
498 if let Some(mut info) = self.index.get_mut(&hash) {
499 info.last_access = now;
500 info.access_count += 1;
501
502 if info.tier == StorageTier::Warm {
504 if let Ok(mut warm) = self.warm_tier.try_write() {
505 *warm.access_tracker.entry(hash).or_insert(0) += 1;
506 }
507 }
508 }
509 }
510
511 Ok(results)
512 }
513
514 async fn delete_triples(&self, pattern: &TriplePattern) -> Result<usize, OxirsError> {
515 let triples = self.query_triples(pattern).await?;
516 let count = triples.len();
517
518 for triple in triples {
519 let hash = Self::hash_triple(&triple);
520
521 if let Some((_, info)) = self.index.remove(&hash) {
523 match info.tier {
525 StorageTier::Hot => {
526 self.hot_tier.lock().pop(&hash);
527 self.stats.hot_count.fetch_sub(1, Ordering::Relaxed);
528 }
529 StorageTier::Warm => {
530 self.warm_tier
531 .write()
532 .await
533 .storage
534 .delete(hash.to_be_bytes())?;
535 self.stats.warm_count.fetch_sub(1, Ordering::Relaxed);
536 }
537 StorageTier::Cold => {
538 self.cold_tier
539 .write()
540 .await
541 .storage
542 .delete(hash.to_be_bytes())?;
543 self.stats.cold_count.fetch_sub(1, Ordering::Relaxed);
544 }
545 StorageTier::Archive => {
546 if !self.config.tiers.archive_tier.immutable {
548 }
550 }
551 }
552
553 self.stats.total_triples.fetch_sub(1, Ordering::Relaxed);
554 self.stats
555 .total_size
556 .fetch_sub(info.size_bytes as u64, Ordering::Relaxed);
557 }
558 }
559
560 Ok(count)
561 }
562
563 async fn stats(&self) -> Result<StorageStats, OxirsError> {
564 let total_queries = self.stats.total_queries.load(Ordering::Relaxed);
565 let hot_hits = self.stats.hot_hits.load(Ordering::Relaxed);
566 let warm_hits = self.stats.warm_hits.load(Ordering::Relaxed);
567 let cold_hits = self.stats.cold_hits.load(Ordering::Relaxed);
568 let total_hits = hot_hits + warm_hits + cold_hits;
569
570 Ok(StorageStats {
571 total_triples: self.stats.total_triples.load(Ordering::Relaxed),
572 total_size_bytes: self.stats.total_size.load(Ordering::Relaxed),
573 tier_stats: TierStats {
574 hot: TierStat {
575 triple_count: self.stats.hot_count.load(Ordering::Relaxed),
576 size_bytes: 0, hit_rate: if total_queries > 0 {
578 (hot_hits as f64 / total_queries as f64) * 100.0
579 } else {
580 0.0
581 },
582 avg_access_time_us: 1, },
584 warm: TierStat {
585 triple_count: self.stats.warm_count.load(Ordering::Relaxed),
586 size_bytes: 0, hit_rate: if total_queries > 0 {
588 (warm_hits as f64 / total_queries as f64) * 100.0
589 } else {
590 0.0
591 },
592 avg_access_time_us: 100, },
594 cold: TierStat {
595 triple_count: self.stats.cold_count.load(Ordering::Relaxed),
596 size_bytes: 0, hit_rate: if total_queries > 0 {
598 (cold_hits as f64 / total_queries as f64) * 100.0
599 } else {
600 0.0
601 },
602 avg_access_time_us: 10000, },
604 archive: TierStat {
605 triple_count: self.stats.archive_count.load(Ordering::Relaxed),
606 size_bytes: 0, hit_rate: 0.0, avg_access_time_us: 1000000, },
610 },
611 compression_ratio: 1.5, query_metrics: QueryMetrics {
613 avg_query_time_ms: 0.1, p99_query_time_ms: 1.0, qps: if total_queries > 0 { 1000.0 } else { 0.0 }, cache_hit_rate: if total_queries > 0 {
617 (total_hits as f64 / total_queries as f64) * 100.0
618 } else {
619 0.0
620 },
621 },
622 })
623 }
624
625 async fn optimize(&self) -> Result<(), OxirsError> {
626 self.warm_tier
628 .read()
629 .await
630 .storage
631 .compact_range(None::<&[u8]>, None::<&[u8]>);
632 self.cold_tier
633 .read()
634 .await
635 .storage
636 .compact_range(None::<&[u8]>, None::<&[u8]>);
637
638 Self::manage_tiers(
640 &self.hot_tier,
641 &self.warm_tier,
642 &self.cold_tier,
643 &self.archive_tier,
644 &self.index,
645 &self.config,
646 )
647 .await?;
648
649 Ok(())
650 }
651
652 async fn backup(&self, path: &Path) -> Result<(), OxirsError> {
653 std::fs::create_dir_all(path)?;
655
656 let metadata = BackupMetadata {
658 version: 1,
659 created_at: SystemTime::now(),
660 total_triples: self.stats.total_triples.load(Ordering::Relaxed),
661 config: self.config.clone(),
662 };
663
664 let metadata_path = path.join("metadata.json");
665 let metadata_json = serde_json::to_string_pretty(&metadata)?;
666 std::fs::write(metadata_path, metadata_json)?;
667
668 let hot_backup = path.join("hot.bin");
671 let hot_data: Vec<_> = self
672 .hot_tier
673 .lock()
674 .iter()
675 .map(|(k, v)| (*k, v.clone()))
676 .collect();
677 let hot_bytes = oxicode::serde::encode_to_vec(&hot_data, oxicode::config::standard())?;
678 std::fs::write(hot_backup, hot_bytes)?;
679
680 let warm_backup = path.join("warm.bin");
682 let warm_data: Vec<(Vec<u8>, Vec<u8>)> = {
683 let warm = self.warm_tier.read().await;
684 let mut data = Vec::new();
685 let iter = warm.storage.iterator(rocksdb::IteratorMode::Start);
686 for (key, value) in iter.flatten() {
687 data.push((key.to_vec(), value.to_vec()));
688 }
689 data
690 };
691 let warm_bytes =
692 oxicode::serde::encode_to_vec(&warm_data, oxicode::config::standard())?;
693 std::fs::write(warm_backup, warm_bytes)?;
694
695 let cold_backup = path.join("cold.bin");
696 let cold_data: Vec<(Vec<u8>, Vec<u8>)> = {
697 let cold = self.cold_tier.read().await;
698 let mut data = Vec::new();
699 let iter = cold.storage.iterator(rocksdb::IteratorMode::Start);
700 for (key, value) in iter.flatten() {
701 data.push((key.to_vec(), value.to_vec()));
702 }
703 data
704 };
705 let cold_bytes =
706 oxicode::serde::encode_to_vec(&cold_data, oxicode::config::standard())?;
707 std::fs::write(cold_backup, cold_bytes)?;
708
709 let index_backup = path.join("index.bin");
711 let index_data: HashMap<_, _> = self
712 .index
713 .iter()
714 .map(|entry| (*entry.key(), entry.value().clone()))
715 .collect();
716 let index_bytes =
717 oxicode::serde::encode_to_vec(&index_data, oxicode::config::standard())?;
718 std::fs::write(index_backup, index_bytes)?;
719
720 Ok(())
721 }
722
723 async fn restore(&self, path: &Path) -> Result<(), OxirsError> {
724 let metadata_path = path.join("metadata.json");
726 let metadata_json = std::fs::read_to_string(metadata_path)?;
727 let metadata: BackupMetadata = serde_json::from_str(&metadata_json)?;
728
729 let hot_backup = path.join("hot.bin");
731 if hot_backup.exists() {
732 let hot_bytes = std::fs::read(hot_backup)?;
733 let hot_data: Vec<(u64, StoredTriple)> =
734 oxicode::serde::decode_from_slice(&hot_bytes, oxicode::config::standard())
735 .map(|(v, _)| v)?;
736
737 let mut hot = self.hot_tier.lock();
738 hot.clear();
739 for (k, v) in hot_data {
740 hot.put(k, v);
741 }
742 }
743
744 let warm_backup = path.join("warm.bin");
746 if warm_backup.exists() {
747 let warm_bytes = std::fs::read(warm_backup)?;
748 let warm_data: Vec<(Vec<u8>, Vec<u8>)> =
749 oxicode::serde::decode_from_slice(&warm_bytes, oxicode::config::standard())
750 .map(|(v, _)| v)?;
751
752 let warm = self.warm_tier.write().await;
753 for (key, value) in warm_data {
754 warm.storage.put(&key, &value)?;
755 }
756 }
757
758 let cold_backup = path.join("cold.bin");
760 if cold_backup.exists() {
761 let cold_bytes = std::fs::read(cold_backup)?;
762 let cold_data: Vec<(Vec<u8>, Vec<u8>)> =
763 oxicode::serde::decode_from_slice(&cold_bytes, oxicode::config::standard())
764 .map(|(v, _)| v)?;
765
766 let cold = self.cold_tier.write().await;
767 for (key, value) in cold_data {
768 cold.storage.put(&key, &value)?;
769 }
770 }
771
772 let index_backup = path.join("index.bin");
774 if index_backup.exists() {
775 let index_bytes = std::fs::read(index_backup)?;
776 let index_data: HashMap<u64, AccessInfo> =
777 oxicode::serde::decode_from_slice(&index_bytes, oxicode::config::standard())
778 .map(|(v, _)| v)?;
779
780 self.index.clear();
781 for (k, v) in index_data {
782 self.index.insert(k, v);
783 }
784 }
785
786 self.stats
788 .total_triples
789 .store(metadata.total_triples, Ordering::Relaxed);
790
791 Ok(())
792 }
793 }
794
795 #[derive(Debug, Serialize, Deserialize)]
797 struct BackupMetadata {
798 version: u32,
799 created_at: SystemTime,
800 total_triples: u64,
801 config: StorageConfig,
802 }
803
804 mod compression {
806 use super::*;
807
808 pub struct Compressor {
809 _compression_type: CompressionType,
810 }
811
812 impl Compressor {
813 pub fn new(compression_type: CompressionType) -> Self {
814 Compressor {
815 _compression_type: compression_type,
816 }
817 }
818
819 pub fn compress(&self, data: &[u8]) -> Result<Vec<u8>, OxirsError> {
820 Ok(data.to_vec())
822 }
823
824 #[allow(dead_code)]
825 pub fn decompress(&self, data: &[u8]) -> Result<Vec<u8>, OxirsError> {
826 Ok(data.to_vec())
828 }
829 }
830 }
831
832 #[cfg(test)]
833 mod tests {
834 use super::*;
835 use crate::model::{Literal, NamedNode};
836 use crate::storage::ArchiveBackend;
837
838 #[tokio::test]
839 async fn test_tiered_storage() {
840 let test_dir = format!(
841 "/tmp/oxirs_tiered_test_{}",
842 std::time::SystemTime::now()
843 .duration_since(std::time::UNIX_EPOCH)
844 .expect("operation should succeed")
845 .as_millis()
846 );
847
848 let mut config = StorageConfig::default();
849 config.tiers.warm_tier.path = format!("{}/warm", test_dir);
850 config.tiers.cold_tier.path = format!("{}/cold", test_dir);
851 config.tiers.archive_tier.backend =
852 ArchiveBackend::Local(format!("{}/archive", test_dir));
853
854 let engine = TieredStorageEngine::create(config)
855 .await
856 .expect("async operation should succeed");
857
858 let subject = NamedNode::new("http://example.org/subject").expect("valid IRI");
860 let predicate = NamedNode::new("http://example.org/predicate").expect("valid IRI");
861 let object = crate::model::Object::Literal(Literal::new("test"));
862 let triple = Triple::new(subject, predicate, object);
863
864 engine
866 .store_triple(&triple)
867 .await
868 .expect("async operation should succeed");
869
870 let pattern = TriplePattern::new(None, None, None);
872 let results = engine
873 .query_triples(&pattern)
874 .await
875 .expect("async operation should succeed");
876 assert_eq!(results.len(), 1);
877 assert_eq!(results[0], triple);
878
879 let stats = engine
881 .stats()
882 .await
883 .expect("async operation should succeed");
884 assert_eq!(stats.total_triples, 1);
885 }
886 }
887} #[cfg(feature = "rocksdb")]
891pub use tiered_impl::*;
892
893#[cfg(not(feature = "rocksdb"))]
895pub struct TieredStorageEngine;
896
897#[cfg(not(feature = "rocksdb"))]
898impl TieredStorageEngine {
899 pub async fn new(_config: crate::storage::StorageConfig) -> Result<Self, crate::OxirsError> {
900 Err(crate::OxirsError::NotSupported(
901 "TieredStorageEngine requires rocksdb feature".to_string(),
902 ))
903 }
904}