oxirs_vec/tiering/
storage_backends.rs1use anyhow::Result;
4use std::path::{Path, PathBuf};
5
6pub trait StorageBackend: Send + Sync {
8 fn load_index(&self, index_id: &str) -> Result<Vec<u8>>;
10
11 fn save_index(&mut self, index_id: &str, data: &[u8]) -> Result<()>;
13
14 fn delete_index(&mut self, index_id: &str) -> Result<()>;
16
17 fn exists(&self, index_id: &str) -> bool;
19
20 fn get_size(&self, index_id: &str) -> Result<u64>;
22
23 fn list_indices(&self) -> Result<Vec<String>>;
25
26 fn backend_type(&self) -> &'static str;
28}
29
30pub struct HotTierStorage {
32 cache: std::sync::Arc<std::sync::Mutex<std::collections::HashMap<String, Vec<u8>>>>,
34}
35
36impl HotTierStorage {
37 pub fn new() -> Self {
39 Self {
40 cache: std::sync::Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
41 }
42 }
43
44 pub fn memory_usage(&self) -> u64 {
46 let cache = self.cache.lock().expect("lock should not be poisoned");
47 cache.values().map(|v| v.len() as u64).sum()
48 }
49
50 pub fn cache_size(&self) -> usize {
52 let cache = self.cache.lock().expect("lock should not be poisoned");
53 cache.len()
54 }
55}
56
57impl Default for HotTierStorage {
58 fn default() -> Self {
59 Self::new()
60 }
61}
62
63impl StorageBackend for HotTierStorage {
64 fn load_index(&self, index_id: &str) -> Result<Vec<u8>> {
65 let cache = self.cache.lock().expect("lock should not be poisoned");
66 cache
67 .get(index_id)
68 .cloned()
69 .ok_or_else(|| anyhow::anyhow!("Index {} not found in hot tier", index_id))
70 }
71
72 fn save_index(&mut self, index_id: &str, data: &[u8]) -> Result<()> {
73 let mut cache = self.cache.lock().expect("lock should not be poisoned");
74 cache.insert(index_id.to_string(), data.to_vec());
75 Ok(())
76 }
77
78 fn delete_index(&mut self, index_id: &str) -> Result<()> {
79 let mut cache = self.cache.lock().expect("lock should not be poisoned");
80 cache
81 .remove(index_id)
82 .ok_or_else(|| anyhow::anyhow!("Index {} not found in hot tier", index_id))?;
83 Ok(())
84 }
85
86 fn exists(&self, index_id: &str) -> bool {
87 let cache = self.cache.lock().expect("lock should not be poisoned");
88 cache.contains_key(index_id)
89 }
90
91 fn get_size(&self, index_id: &str) -> Result<u64> {
92 let cache = self.cache.lock().expect("lock should not be poisoned");
93 cache
94 .get(index_id)
95 .map(|v| v.len() as u64)
96 .ok_or_else(|| anyhow::anyhow!("Index {} not found in hot tier", index_id))
97 }
98
99 fn list_indices(&self) -> Result<Vec<String>> {
100 let cache = self.cache.lock().expect("lock should not be poisoned");
101 Ok(cache.keys().cloned().collect())
102 }
103
104 fn backend_type(&self) -> &'static str {
105 "HotTier (In-Memory)"
106 }
107}
108
109pub struct WarmTierStorage {
111 base_path: PathBuf,
113 compression_enabled: bool,
115 compression_level: i32,
117}
118
119impl WarmTierStorage {
120 pub fn new<P: AsRef<Path>>(
122 base_path: P,
123 compression_enabled: bool,
124 compression_level: i32,
125 ) -> Result<Self> {
126 let base_path = base_path.as_ref().to_path_buf();
127 std::fs::create_dir_all(&base_path)?;
128
129 Ok(Self {
130 base_path,
131 compression_enabled,
132 compression_level,
133 })
134 }
135
136 fn get_index_path(&self, index_id: &str) -> PathBuf {
138 let filename = if self.compression_enabled {
139 format!("{}.idx.zst", index_id)
140 } else {
141 format!("{}.idx", index_id)
142 };
143 self.base_path.join(filename)
144 }
145}
146
147impl StorageBackend for WarmTierStorage {
148 fn load_index(&self, index_id: &str) -> Result<Vec<u8>> {
149 let path = self.get_index_path(index_id);
150 let data = std::fs::read(&path)?;
151
152 if self.compression_enabled {
153 Ok(oxiarc_zstd::decode_all(&data)
154 .map_err(|e| anyhow::anyhow!("Zstd decompression failed: {}", e))?)
155 } else {
156 Ok(data)
157 }
158 }
159
160 fn save_index(&mut self, index_id: &str, data: &[u8]) -> Result<()> {
161 let path = self.get_index_path(index_id);
162
163 let final_data = if self.compression_enabled {
164 oxiarc_zstd::encode_all(data, self.compression_level)
165 .map_err(|e| anyhow::anyhow!("Zstd compression failed: {}", e))?
166 } else {
167 data.to_vec()
168 };
169
170 std::fs::write(&path, final_data)?;
171 Ok(())
172 }
173
174 fn delete_index(&mut self, index_id: &str) -> Result<()> {
175 let path = self.get_index_path(index_id);
176 std::fs::remove_file(&path)?;
177 Ok(())
178 }
179
180 fn exists(&self, index_id: &str) -> bool {
181 self.get_index_path(index_id).exists()
182 }
183
184 fn get_size(&self, index_id: &str) -> Result<u64> {
185 let path = self.get_index_path(index_id);
186 Ok(std::fs::metadata(&path)?.len())
187 }
188
189 fn list_indices(&self) -> Result<Vec<String>> {
190 let mut indices = Vec::new();
191 for entry in std::fs::read_dir(&self.base_path)? {
192 let entry = entry?;
193 if let Some(filename) = entry.file_name().to_str() {
194 if filename.ends_with(".idx") || filename.ends_with(".idx.zst") {
195 let index_id = filename
196 .trim_end_matches(".idx.zst")
197 .trim_end_matches(".idx")
198 .to_string();
199 indices.push(index_id);
200 }
201 }
202 }
203 Ok(indices)
204 }
205
206 fn backend_type(&self) -> &'static str {
207 "WarmTier (Memory-Mapped)"
208 }
209}
210
211pub struct ColdTierStorage {
213 base_path: PathBuf,
215 compression_level: i32,
217}
218
219impl ColdTierStorage {
220 pub fn new<P: AsRef<Path>>(base_path: P, compression_level: i32) -> Result<Self> {
222 let base_path = base_path.as_ref().to_path_buf();
223 std::fs::create_dir_all(&base_path)?;
224
225 Ok(Self {
226 base_path,
227 compression_level,
228 })
229 }
230
231 fn get_index_path(&self, index_id: &str) -> PathBuf {
233 self.base_path.join(format!("{}.idx.zst", index_id))
234 }
235}
236
237impl StorageBackend for ColdTierStorage {
238 fn load_index(&self, index_id: &str) -> Result<Vec<u8>> {
239 let path = self.get_index_path(index_id);
240 let compressed_data = std::fs::read(&path)?;
241 oxiarc_zstd::decode_all(&compressed_data)
242 .map_err(|e| anyhow::anyhow!("Zstd decompression failed: {}", e))
243 }
244
245 fn save_index(&mut self, index_id: &str, data: &[u8]) -> Result<()> {
246 let path = self.get_index_path(index_id);
247 let compressed_data = oxiarc_zstd::encode_all(data, self.compression_level)
248 .map_err(|e| anyhow::anyhow!("Zstd compression failed: {}", e))?;
249 std::fs::write(&path, compressed_data)?;
250 Ok(())
251 }
252
253 fn delete_index(&mut self, index_id: &str) -> Result<()> {
254 let path = self.get_index_path(index_id);
255 std::fs::remove_file(&path)?;
256 Ok(())
257 }
258
259 fn exists(&self, index_id: &str) -> bool {
260 self.get_index_path(index_id).exists()
261 }
262
263 fn get_size(&self, index_id: &str) -> Result<u64> {
264 let path = self.get_index_path(index_id);
265 Ok(std::fs::metadata(&path)?.len())
266 }
267
268 fn list_indices(&self) -> Result<Vec<String>> {
269 let mut indices = Vec::new();
270 for entry in std::fs::read_dir(&self.base_path)? {
271 let entry = entry?;
272 if let Some(filename) = entry.file_name().to_str() {
273 if filename.ends_with(".idx.zst") {
274 let index_id = filename.trim_end_matches(".idx.zst").to_string();
275 indices.push(index_id);
276 }
277 }
278 }
279 Ok(indices)
280 }
281
282 fn backend_type(&self) -> &'static str {
283 "ColdTier (Compressed Disk)"
284 }
285}
286
287#[cfg(test)]
288mod tests {
289 use super::*;
290
291 #[test]
292 fn test_hot_tier_storage() {
293 let mut storage = HotTierStorage::new();
294
295 let data = vec![1, 2, 3, 4, 5];
296 storage.save_index("test", &data).unwrap();
297
298 assert!(storage.exists("test"));
299 assert_eq!(storage.get_size("test").unwrap(), 5);
300
301 let loaded = storage.load_index("test").unwrap();
302 assert_eq!(loaded, data);
303
304 storage.delete_index("test").unwrap();
305 assert!(!storage.exists("test"));
306 }
307
308 #[test]
309 fn test_warm_tier_storage() {
310 use std::env;
311 let temp_dir = env::temp_dir().join("oxirs_warm_tier_test");
312 std::fs::create_dir_all(&temp_dir).unwrap();
313
314 let mut storage = WarmTierStorage::new(&temp_dir, true, 6).unwrap();
315
316 let data = vec![1, 2, 3, 4, 5];
317 storage.save_index("test", &data).unwrap();
318
319 assert!(storage.exists("test"));
320
321 let loaded = storage.load_index("test").unwrap();
322 assert_eq!(loaded, data);
323
324 storage.delete_index("test").unwrap();
325 assert!(!storage.exists("test"));
326
327 std::fs::remove_dir_all(&temp_dir).ok();
328 }
329
330 #[test]
331 fn test_cold_tier_storage() {
332 use std::env;
333 let temp_dir = env::temp_dir().join("oxirs_cold_tier_test");
334 std::fs::create_dir_all(&temp_dir).unwrap();
335
336 let mut storage = ColdTierStorage::new(&temp_dir, 19).unwrap();
337
338 let data = vec![1, 2, 3, 4, 5];
339 storage.save_index("test", &data).unwrap();
340
341 assert!(storage.exists("test"));
342
343 let loaded = storage.load_index("test").unwrap();
344 assert_eq!(loaded, data);
345
346 storage.delete_index("test").unwrap();
347 assert!(!storage.exists("test"));
348
349 std::fs::remove_dir_all(&temp_dir).ok();
350 }
351}