oxirs_vec/tiering/
storage_backends.rs

1//! Storage backends for different tiers
2
3use anyhow::Result;
4use std::path::{Path, PathBuf};
5
6/// Storage backend trait for tier implementations
7pub trait StorageBackend: Send + Sync {
8    /// Load an index from storage
9    fn load_index(&self, index_id: &str) -> Result<Vec<u8>>;
10
11    /// Save an index to storage
12    fn save_index(&mut self, index_id: &str, data: &[u8]) -> Result<()>;
13
14    /// Delete an index from storage
15    fn delete_index(&mut self, index_id: &str) -> Result<()>;
16
17    /// Check if an index exists
18    fn exists(&self, index_id: &str) -> bool;
19
20    /// Get the size of an index in bytes
21    fn get_size(&self, index_id: &str) -> Result<u64>;
22
23    /// List all indices in this storage
24    fn list_indices(&self) -> Result<Vec<String>>;
25
26    /// Get storage backend type name
27    fn backend_type(&self) -> &'static str;
28}
29
30/// Hot tier storage: In-memory storage
31pub struct HotTierStorage {
32    /// In-memory cache of indices
33    cache: std::sync::Arc<std::sync::Mutex<std::collections::HashMap<String, Vec<u8>>>>,
34}
35
36impl HotTierStorage {
37    /// Create a new hot tier storage
38    pub fn new() -> Self {
39        Self {
40            cache: std::sync::Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
41        }
42    }
43
44    /// Get current memory usage in bytes
45    pub fn memory_usage(&self) -> u64 {
46        let cache = self.cache.lock().unwrap();
47        cache.values().map(|v| v.len() as u64).sum()
48    }
49
50    /// Get number of cached indices
51    pub fn cache_size(&self) -> usize {
52        let cache = self.cache.lock().unwrap();
53        cache.len()
54    }
55}
56
57impl Default for HotTierStorage {
58    fn default() -> Self {
59        Self::new()
60    }
61}
62
63impl StorageBackend for HotTierStorage {
64    fn load_index(&self, index_id: &str) -> Result<Vec<u8>> {
65        let cache = self.cache.lock().unwrap();
66        cache
67            .get(index_id)
68            .cloned()
69            .ok_or_else(|| anyhow::anyhow!("Index {} not found in hot tier", index_id))
70    }
71
72    fn save_index(&mut self, index_id: &str, data: &[u8]) -> Result<()> {
73        let mut cache = self.cache.lock().unwrap();
74        cache.insert(index_id.to_string(), data.to_vec());
75        Ok(())
76    }
77
78    fn delete_index(&mut self, index_id: &str) -> Result<()> {
79        let mut cache = self.cache.lock().unwrap();
80        cache
81            .remove(index_id)
82            .ok_or_else(|| anyhow::anyhow!("Index {} not found in hot tier", index_id))?;
83        Ok(())
84    }
85
86    fn exists(&self, index_id: &str) -> bool {
87        let cache = self.cache.lock().unwrap();
88        cache.contains_key(index_id)
89    }
90
91    fn get_size(&self, index_id: &str) -> Result<u64> {
92        let cache = self.cache.lock().unwrap();
93        cache
94            .get(index_id)
95            .map(|v| v.len() as u64)
96            .ok_or_else(|| anyhow::anyhow!("Index {} not found in hot tier", index_id))
97    }
98
99    fn list_indices(&self) -> Result<Vec<String>> {
100        let cache = self.cache.lock().unwrap();
101        Ok(cache.keys().cloned().collect())
102    }
103
104    fn backend_type(&self) -> &'static str {
105        "HotTier (In-Memory)"
106    }
107}
108
109/// Warm tier storage: Memory-mapped files
110pub struct WarmTierStorage {
111    /// Base directory for storage
112    base_path: PathBuf,
113    /// Compression enabled
114    compression_enabled: bool,
115    /// Compression level
116    compression_level: i32,
117}
118
119impl WarmTierStorage {
120    /// Create a new warm tier storage
121    pub fn new<P: AsRef<Path>>(
122        base_path: P,
123        compression_enabled: bool,
124        compression_level: i32,
125    ) -> Result<Self> {
126        let base_path = base_path.as_ref().to_path_buf();
127        std::fs::create_dir_all(&base_path)?;
128
129        Ok(Self {
130            base_path,
131            compression_enabled,
132            compression_level,
133        })
134    }
135
136    /// Get path for an index
137    fn get_index_path(&self, index_id: &str) -> PathBuf {
138        let filename = if self.compression_enabled {
139            format!("{}.idx.zst", index_id)
140        } else {
141            format!("{}.idx", index_id)
142        };
143        self.base_path.join(filename)
144    }
145}
146
147impl StorageBackend for WarmTierStorage {
148    fn load_index(&self, index_id: &str) -> Result<Vec<u8>> {
149        let path = self.get_index_path(index_id);
150        let data = std::fs::read(&path)?;
151
152        if self.compression_enabled {
153            Ok(zstd::decode_all(&data[..])?)
154        } else {
155            Ok(data)
156        }
157    }
158
159    fn save_index(&mut self, index_id: &str, data: &[u8]) -> Result<()> {
160        let path = self.get_index_path(index_id);
161
162        let final_data = if self.compression_enabled {
163            zstd::encode_all(data, self.compression_level)?
164        } else {
165            data.to_vec()
166        };
167
168        std::fs::write(&path, final_data)?;
169        Ok(())
170    }
171
172    fn delete_index(&mut self, index_id: &str) -> Result<()> {
173        let path = self.get_index_path(index_id);
174        std::fs::remove_file(&path)?;
175        Ok(())
176    }
177
178    fn exists(&self, index_id: &str) -> bool {
179        self.get_index_path(index_id).exists()
180    }
181
182    fn get_size(&self, index_id: &str) -> Result<u64> {
183        let path = self.get_index_path(index_id);
184        Ok(std::fs::metadata(&path)?.len())
185    }
186
187    fn list_indices(&self) -> Result<Vec<String>> {
188        let mut indices = Vec::new();
189        for entry in std::fs::read_dir(&self.base_path)? {
190            let entry = entry?;
191            if let Some(filename) = entry.file_name().to_str() {
192                if filename.ends_with(".idx") || filename.ends_with(".idx.zst") {
193                    let index_id = filename
194                        .trim_end_matches(".idx.zst")
195                        .trim_end_matches(".idx")
196                        .to_string();
197                    indices.push(index_id);
198                }
199            }
200        }
201        Ok(indices)
202    }
203
204    fn backend_type(&self) -> &'static str {
205        "WarmTier (Memory-Mapped)"
206    }
207}
208
209/// Cold tier storage: Compressed disk storage
210pub struct ColdTierStorage {
211    /// Base directory for storage
212    base_path: PathBuf,
213    /// Compression level (high)
214    compression_level: i32,
215}
216
217impl ColdTierStorage {
218    /// Create a new cold tier storage
219    pub fn new<P: AsRef<Path>>(base_path: P, compression_level: i32) -> Result<Self> {
220        let base_path = base_path.as_ref().to_path_buf();
221        std::fs::create_dir_all(&base_path)?;
222
223        Ok(Self {
224            base_path,
225            compression_level,
226        })
227    }
228
229    /// Get path for an index
230    fn get_index_path(&self, index_id: &str) -> PathBuf {
231        self.base_path.join(format!("{}.idx.zst", index_id))
232    }
233}
234
235impl StorageBackend for ColdTierStorage {
236    fn load_index(&self, index_id: &str) -> Result<Vec<u8>> {
237        let path = self.get_index_path(index_id);
238        let compressed_data = std::fs::read(&path)?;
239        Ok(zstd::decode_all(&compressed_data[..])?)
240    }
241
242    fn save_index(&mut self, index_id: &str, data: &[u8]) -> Result<()> {
243        let path = self.get_index_path(index_id);
244        let compressed_data = zstd::encode_all(data, self.compression_level)?;
245        std::fs::write(&path, compressed_data)?;
246        Ok(())
247    }
248
249    fn delete_index(&mut self, index_id: &str) -> Result<()> {
250        let path = self.get_index_path(index_id);
251        std::fs::remove_file(&path)?;
252        Ok(())
253    }
254
255    fn exists(&self, index_id: &str) -> bool {
256        self.get_index_path(index_id).exists()
257    }
258
259    fn get_size(&self, index_id: &str) -> Result<u64> {
260        let path = self.get_index_path(index_id);
261        Ok(std::fs::metadata(&path)?.len())
262    }
263
264    fn list_indices(&self) -> Result<Vec<String>> {
265        let mut indices = Vec::new();
266        for entry in std::fs::read_dir(&self.base_path)? {
267            let entry = entry?;
268            if let Some(filename) = entry.file_name().to_str() {
269                if filename.ends_with(".idx.zst") {
270                    let index_id = filename.trim_end_matches(".idx.zst").to_string();
271                    indices.push(index_id);
272                }
273            }
274        }
275        Ok(indices)
276    }
277
278    fn backend_type(&self) -> &'static str {
279        "ColdTier (Compressed Disk)"
280    }
281}
282
283#[cfg(test)]
284mod tests {
285    use super::*;
286
287    #[test]
288    fn test_hot_tier_storage() {
289        let mut storage = HotTierStorage::new();
290
291        let data = vec![1, 2, 3, 4, 5];
292        storage.save_index("test", &data).unwrap();
293
294        assert!(storage.exists("test"));
295        assert_eq!(storage.get_size("test").unwrap(), 5);
296
297        let loaded = storage.load_index("test").unwrap();
298        assert_eq!(loaded, data);
299
300        storage.delete_index("test").unwrap();
301        assert!(!storage.exists("test"));
302    }
303
304    #[test]
305    fn test_warm_tier_storage() {
306        use std::env;
307        let temp_dir = env::temp_dir().join("oxirs_warm_tier_test");
308        std::fs::create_dir_all(&temp_dir).unwrap();
309
310        let mut storage = WarmTierStorage::new(&temp_dir, true, 6).unwrap();
311
312        let data = vec![1, 2, 3, 4, 5];
313        storage.save_index("test", &data).unwrap();
314
315        assert!(storage.exists("test"));
316
317        let loaded = storage.load_index("test").unwrap();
318        assert_eq!(loaded, data);
319
320        storage.delete_index("test").unwrap();
321        assert!(!storage.exists("test"));
322
323        std::fs::remove_dir_all(&temp_dir).ok();
324    }
325
326    #[test]
327    fn test_cold_tier_storage() {
328        use std::env;
329        let temp_dir = env::temp_dir().join("oxirs_cold_tier_test");
330        std::fs::create_dir_all(&temp_dir).unwrap();
331
332        let mut storage = ColdTierStorage::new(&temp_dir, 19).unwrap();
333
334        let data = vec![1, 2, 3, 4, 5];
335        storage.save_index("test", &data).unwrap();
336
337        assert!(storage.exists("test"));
338
339        let loaded = storage.load_index("test").unwrap();
340        assert_eq!(loaded, data);
341
342        storage.delete_index("test").unwrap();
343        assert!(!storage.exists("test"));
344
345        std::fs::remove_dir_all(&temp_dir).ok();
346    }
347}