Skip to main content

oxirs_vec/tiering/
storage_backends.rs

1//! Storage backends for different tiers
2
3use anyhow::Result;
4use std::path::{Path, PathBuf};
5
6/// Storage backend trait for tier implementations
7pub trait StorageBackend: Send + Sync {
8    /// Load an index from storage
9    fn load_index(&self, index_id: &str) -> Result<Vec<u8>>;
10
11    /// Save an index to storage
12    fn save_index(&mut self, index_id: &str, data: &[u8]) -> Result<()>;
13
14    /// Delete an index from storage
15    fn delete_index(&mut self, index_id: &str) -> Result<()>;
16
17    /// Check if an index exists
18    fn exists(&self, index_id: &str) -> bool;
19
20    /// Get the size of an index in bytes
21    fn get_size(&self, index_id: &str) -> Result<u64>;
22
23    /// List all indices in this storage
24    fn list_indices(&self) -> Result<Vec<String>>;
25
26    /// Get storage backend type name
27    fn backend_type(&self) -> &'static str;
28}
29
30/// Hot tier storage: In-memory storage
31pub struct HotTierStorage {
32    /// In-memory cache of indices
33    cache: std::sync::Arc<std::sync::Mutex<std::collections::HashMap<String, Vec<u8>>>>,
34}
35
36impl HotTierStorage {
37    /// Create a new hot tier storage
38    pub fn new() -> Self {
39        Self {
40            cache: std::sync::Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
41        }
42    }
43
44    /// Get current memory usage in bytes
45    pub fn memory_usage(&self) -> u64 {
46        let cache = self.cache.lock().expect("lock should not be poisoned");
47        cache.values().map(|v| v.len() as u64).sum()
48    }
49
50    /// Get number of cached indices
51    pub fn cache_size(&self) -> usize {
52        let cache = self.cache.lock().expect("lock should not be poisoned");
53        cache.len()
54    }
55}
56
57impl Default for HotTierStorage {
58    fn default() -> Self {
59        Self::new()
60    }
61}
62
63impl StorageBackend for HotTierStorage {
64    fn load_index(&self, index_id: &str) -> Result<Vec<u8>> {
65        let cache = self.cache.lock().expect("lock should not be poisoned");
66        cache
67            .get(index_id)
68            .cloned()
69            .ok_or_else(|| anyhow::anyhow!("Index {} not found in hot tier", index_id))
70    }
71
72    fn save_index(&mut self, index_id: &str, data: &[u8]) -> Result<()> {
73        let mut cache = self.cache.lock().expect("lock should not be poisoned");
74        cache.insert(index_id.to_string(), data.to_vec());
75        Ok(())
76    }
77
78    fn delete_index(&mut self, index_id: &str) -> Result<()> {
79        let mut cache = self.cache.lock().expect("lock should not be poisoned");
80        cache
81            .remove(index_id)
82            .ok_or_else(|| anyhow::anyhow!("Index {} not found in hot tier", index_id))?;
83        Ok(())
84    }
85
86    fn exists(&self, index_id: &str) -> bool {
87        let cache = self.cache.lock().expect("lock should not be poisoned");
88        cache.contains_key(index_id)
89    }
90
91    fn get_size(&self, index_id: &str) -> Result<u64> {
92        let cache = self.cache.lock().expect("lock should not be poisoned");
93        cache
94            .get(index_id)
95            .map(|v| v.len() as u64)
96            .ok_or_else(|| anyhow::anyhow!("Index {} not found in hot tier", index_id))
97    }
98
99    fn list_indices(&self) -> Result<Vec<String>> {
100        let cache = self.cache.lock().expect("lock should not be poisoned");
101        Ok(cache.keys().cloned().collect())
102    }
103
104    fn backend_type(&self) -> &'static str {
105        "HotTier (In-Memory)"
106    }
107}
108
109/// Warm tier storage: Memory-mapped files
110pub struct WarmTierStorage {
111    /// Base directory for storage
112    base_path: PathBuf,
113    /// Compression enabled
114    compression_enabled: bool,
115    /// Compression level
116    compression_level: i32,
117}
118
119impl WarmTierStorage {
120    /// Create a new warm tier storage
121    pub fn new<P: AsRef<Path>>(
122        base_path: P,
123        compression_enabled: bool,
124        compression_level: i32,
125    ) -> Result<Self> {
126        let base_path = base_path.as_ref().to_path_buf();
127        std::fs::create_dir_all(&base_path)?;
128
129        Ok(Self {
130            base_path,
131            compression_enabled,
132            compression_level,
133        })
134    }
135
136    /// Get path for an index
137    fn get_index_path(&self, index_id: &str) -> PathBuf {
138        let filename = if self.compression_enabled {
139            format!("{}.idx.zst", index_id)
140        } else {
141            format!("{}.idx", index_id)
142        };
143        self.base_path.join(filename)
144    }
145}
146
147impl StorageBackend for WarmTierStorage {
148    fn load_index(&self, index_id: &str) -> Result<Vec<u8>> {
149        let path = self.get_index_path(index_id);
150        let data = std::fs::read(&path)?;
151
152        if self.compression_enabled {
153            Ok(oxiarc_zstd::decode_all(&data)
154                .map_err(|e| anyhow::anyhow!("Zstd decompression failed: {}", e))?)
155        } else {
156            Ok(data)
157        }
158    }
159
160    fn save_index(&mut self, index_id: &str, data: &[u8]) -> Result<()> {
161        let path = self.get_index_path(index_id);
162
163        let final_data = if self.compression_enabled {
164            oxiarc_zstd::encode_all(data, self.compression_level)
165                .map_err(|e| anyhow::anyhow!("Zstd compression failed: {}", e))?
166        } else {
167            data.to_vec()
168        };
169
170        std::fs::write(&path, final_data)?;
171        Ok(())
172    }
173
174    fn delete_index(&mut self, index_id: &str) -> Result<()> {
175        let path = self.get_index_path(index_id);
176        std::fs::remove_file(&path)?;
177        Ok(())
178    }
179
180    fn exists(&self, index_id: &str) -> bool {
181        self.get_index_path(index_id).exists()
182    }
183
184    fn get_size(&self, index_id: &str) -> Result<u64> {
185        let path = self.get_index_path(index_id);
186        Ok(std::fs::metadata(&path)?.len())
187    }
188
189    fn list_indices(&self) -> Result<Vec<String>> {
190        let mut indices = Vec::new();
191        for entry in std::fs::read_dir(&self.base_path)? {
192            let entry = entry?;
193            if let Some(filename) = entry.file_name().to_str() {
194                if filename.ends_with(".idx") || filename.ends_with(".idx.zst") {
195                    let index_id = filename
196                        .trim_end_matches(".idx.zst")
197                        .trim_end_matches(".idx")
198                        .to_string();
199                    indices.push(index_id);
200                }
201            }
202        }
203        Ok(indices)
204    }
205
206    fn backend_type(&self) -> &'static str {
207        "WarmTier (Memory-Mapped)"
208    }
209}
210
211/// Cold tier storage: Compressed disk storage
212pub struct ColdTierStorage {
213    /// Base directory for storage
214    base_path: PathBuf,
215    /// Compression level (high)
216    compression_level: i32,
217}
218
219impl ColdTierStorage {
220    /// Create a new cold tier storage
221    pub fn new<P: AsRef<Path>>(base_path: P, compression_level: i32) -> Result<Self> {
222        let base_path = base_path.as_ref().to_path_buf();
223        std::fs::create_dir_all(&base_path)?;
224
225        Ok(Self {
226            base_path,
227            compression_level,
228        })
229    }
230
231    /// Get path for an index
232    fn get_index_path(&self, index_id: &str) -> PathBuf {
233        self.base_path.join(format!("{}.idx.zst", index_id))
234    }
235}
236
237impl StorageBackend for ColdTierStorage {
238    fn load_index(&self, index_id: &str) -> Result<Vec<u8>> {
239        let path = self.get_index_path(index_id);
240        let compressed_data = std::fs::read(&path)?;
241        oxiarc_zstd::decode_all(&compressed_data)
242            .map_err(|e| anyhow::anyhow!("Zstd decompression failed: {}", e))
243    }
244
245    fn save_index(&mut self, index_id: &str, data: &[u8]) -> Result<()> {
246        let path = self.get_index_path(index_id);
247        let compressed_data = oxiarc_zstd::encode_all(data, self.compression_level)
248            .map_err(|e| anyhow::anyhow!("Zstd compression failed: {}", e))?;
249        std::fs::write(&path, compressed_data)?;
250        Ok(())
251    }
252
253    fn delete_index(&mut self, index_id: &str) -> Result<()> {
254        let path = self.get_index_path(index_id);
255        std::fs::remove_file(&path)?;
256        Ok(())
257    }
258
259    fn exists(&self, index_id: &str) -> bool {
260        self.get_index_path(index_id).exists()
261    }
262
263    fn get_size(&self, index_id: &str) -> Result<u64> {
264        let path = self.get_index_path(index_id);
265        Ok(std::fs::metadata(&path)?.len())
266    }
267
268    fn list_indices(&self) -> Result<Vec<String>> {
269        let mut indices = Vec::new();
270        for entry in std::fs::read_dir(&self.base_path)? {
271            let entry = entry?;
272            if let Some(filename) = entry.file_name().to_str() {
273                if filename.ends_with(".idx.zst") {
274                    let index_id = filename.trim_end_matches(".idx.zst").to_string();
275                    indices.push(index_id);
276                }
277            }
278        }
279        Ok(indices)
280    }
281
282    fn backend_type(&self) -> &'static str {
283        "ColdTier (Compressed Disk)"
284    }
285}
286
287#[cfg(test)]
288mod tests {
289    use super::*;
290
291    #[test]
292    fn test_hot_tier_storage() {
293        let mut storage = HotTierStorage::new();
294
295        let data = vec![1, 2, 3, 4, 5];
296        storage.save_index("test", &data).unwrap();
297
298        assert!(storage.exists("test"));
299        assert_eq!(storage.get_size("test").unwrap(), 5);
300
301        let loaded = storage.load_index("test").unwrap();
302        assert_eq!(loaded, data);
303
304        storage.delete_index("test").unwrap();
305        assert!(!storage.exists("test"));
306    }
307
308    #[test]
309    fn test_warm_tier_storage() {
310        use std::env;
311        let temp_dir = env::temp_dir().join("oxirs_warm_tier_test");
312        std::fs::create_dir_all(&temp_dir).unwrap();
313
314        let mut storage = WarmTierStorage::new(&temp_dir, true, 6).unwrap();
315
316        let data = vec![1, 2, 3, 4, 5];
317        storage.save_index("test", &data).unwrap();
318
319        assert!(storage.exists("test"));
320
321        let loaded = storage.load_index("test").unwrap();
322        assert_eq!(loaded, data);
323
324        storage.delete_index("test").unwrap();
325        assert!(!storage.exists("test"));
326
327        std::fs::remove_dir_all(&temp_dir).ok();
328    }
329
330    #[test]
331    fn test_cold_tier_storage() {
332        use std::env;
333        let temp_dir = env::temp_dir().join("oxirs_cold_tier_test");
334        std::fs::create_dir_all(&temp_dir).unwrap();
335
336        let mut storage = ColdTierStorage::new(&temp_dir, 19).unwrap();
337
338        let data = vec![1, 2, 3, 4, 5];
339        storage.save_index("test", &data).unwrap();
340
341        assert!(storage.exists("test"));
342
343        let loaded = storage.load_index("test").unwrap();
344        assert_eq!(loaded, data);
345
346        storage.delete_index("test").unwrap();
347        assert!(!storage.exists("test"));
348
349        std::fs::remove_dir_all(&temp_dir).ok();
350    }
351}