oxirs_vec/tiering/
storage_backends.rs1use anyhow::Result;
4use std::path::{Path, PathBuf};
5
6pub trait StorageBackend: Send + Sync {
8 fn load_index(&self, index_id: &str) -> Result<Vec<u8>>;
10
11 fn save_index(&mut self, index_id: &str, data: &[u8]) -> Result<()>;
13
14 fn delete_index(&mut self, index_id: &str) -> Result<()>;
16
17 fn exists(&self, index_id: &str) -> bool;
19
20 fn get_size(&self, index_id: &str) -> Result<u64>;
22
23 fn list_indices(&self) -> Result<Vec<String>>;
25
26 fn backend_type(&self) -> &'static str;
28}
29
30pub struct HotTierStorage {
32 cache: std::sync::Arc<std::sync::Mutex<std::collections::HashMap<String, Vec<u8>>>>,
34}
35
36impl HotTierStorage {
37 pub fn new() -> Self {
39 Self {
40 cache: std::sync::Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
41 }
42 }
43
44 pub fn memory_usage(&self) -> u64 {
46 let cache = self.cache.lock().unwrap();
47 cache.values().map(|v| v.len() as u64).sum()
48 }
49
50 pub fn cache_size(&self) -> usize {
52 let cache = self.cache.lock().unwrap();
53 cache.len()
54 }
55}
56
57impl Default for HotTierStorage {
58 fn default() -> Self {
59 Self::new()
60 }
61}
62
63impl StorageBackend for HotTierStorage {
64 fn load_index(&self, index_id: &str) -> Result<Vec<u8>> {
65 let cache = self.cache.lock().unwrap();
66 cache
67 .get(index_id)
68 .cloned()
69 .ok_or_else(|| anyhow::anyhow!("Index {} not found in hot tier", index_id))
70 }
71
72 fn save_index(&mut self, index_id: &str, data: &[u8]) -> Result<()> {
73 let mut cache = self.cache.lock().unwrap();
74 cache.insert(index_id.to_string(), data.to_vec());
75 Ok(())
76 }
77
78 fn delete_index(&mut self, index_id: &str) -> Result<()> {
79 let mut cache = self.cache.lock().unwrap();
80 cache
81 .remove(index_id)
82 .ok_or_else(|| anyhow::anyhow!("Index {} not found in hot tier", index_id))?;
83 Ok(())
84 }
85
86 fn exists(&self, index_id: &str) -> bool {
87 let cache = self.cache.lock().unwrap();
88 cache.contains_key(index_id)
89 }
90
91 fn get_size(&self, index_id: &str) -> Result<u64> {
92 let cache = self.cache.lock().unwrap();
93 cache
94 .get(index_id)
95 .map(|v| v.len() as u64)
96 .ok_or_else(|| anyhow::anyhow!("Index {} not found in hot tier", index_id))
97 }
98
99 fn list_indices(&self) -> Result<Vec<String>> {
100 let cache = self.cache.lock().unwrap();
101 Ok(cache.keys().cloned().collect())
102 }
103
104 fn backend_type(&self) -> &'static str {
105 "HotTier (In-Memory)"
106 }
107}
108
109pub struct WarmTierStorage {
111 base_path: PathBuf,
113 compression_enabled: bool,
115 compression_level: i32,
117}
118
119impl WarmTierStorage {
120 pub fn new<P: AsRef<Path>>(
122 base_path: P,
123 compression_enabled: bool,
124 compression_level: i32,
125 ) -> Result<Self> {
126 let base_path = base_path.as_ref().to_path_buf();
127 std::fs::create_dir_all(&base_path)?;
128
129 Ok(Self {
130 base_path,
131 compression_enabled,
132 compression_level,
133 })
134 }
135
136 fn get_index_path(&self, index_id: &str) -> PathBuf {
138 let filename = if self.compression_enabled {
139 format!("{}.idx.zst", index_id)
140 } else {
141 format!("{}.idx", index_id)
142 };
143 self.base_path.join(filename)
144 }
145}
146
147impl StorageBackend for WarmTierStorage {
148 fn load_index(&self, index_id: &str) -> Result<Vec<u8>> {
149 let path = self.get_index_path(index_id);
150 let data = std::fs::read(&path)?;
151
152 if self.compression_enabled {
153 Ok(zstd::decode_all(&data[..])?)
154 } else {
155 Ok(data)
156 }
157 }
158
159 fn save_index(&mut self, index_id: &str, data: &[u8]) -> Result<()> {
160 let path = self.get_index_path(index_id);
161
162 let final_data = if self.compression_enabled {
163 zstd::encode_all(data, self.compression_level)?
164 } else {
165 data.to_vec()
166 };
167
168 std::fs::write(&path, final_data)?;
169 Ok(())
170 }
171
172 fn delete_index(&mut self, index_id: &str) -> Result<()> {
173 let path = self.get_index_path(index_id);
174 std::fs::remove_file(&path)?;
175 Ok(())
176 }
177
178 fn exists(&self, index_id: &str) -> bool {
179 self.get_index_path(index_id).exists()
180 }
181
182 fn get_size(&self, index_id: &str) -> Result<u64> {
183 let path = self.get_index_path(index_id);
184 Ok(std::fs::metadata(&path)?.len())
185 }
186
187 fn list_indices(&self) -> Result<Vec<String>> {
188 let mut indices = Vec::new();
189 for entry in std::fs::read_dir(&self.base_path)? {
190 let entry = entry?;
191 if let Some(filename) = entry.file_name().to_str() {
192 if filename.ends_with(".idx") || filename.ends_with(".idx.zst") {
193 let index_id = filename
194 .trim_end_matches(".idx.zst")
195 .trim_end_matches(".idx")
196 .to_string();
197 indices.push(index_id);
198 }
199 }
200 }
201 Ok(indices)
202 }
203
204 fn backend_type(&self) -> &'static str {
205 "WarmTier (Memory-Mapped)"
206 }
207}
208
209pub struct ColdTierStorage {
211 base_path: PathBuf,
213 compression_level: i32,
215}
216
217impl ColdTierStorage {
218 pub fn new<P: AsRef<Path>>(base_path: P, compression_level: i32) -> Result<Self> {
220 let base_path = base_path.as_ref().to_path_buf();
221 std::fs::create_dir_all(&base_path)?;
222
223 Ok(Self {
224 base_path,
225 compression_level,
226 })
227 }
228
229 fn get_index_path(&self, index_id: &str) -> PathBuf {
231 self.base_path.join(format!("{}.idx.zst", index_id))
232 }
233}
234
235impl StorageBackend for ColdTierStorage {
236 fn load_index(&self, index_id: &str) -> Result<Vec<u8>> {
237 let path = self.get_index_path(index_id);
238 let compressed_data = std::fs::read(&path)?;
239 Ok(zstd::decode_all(&compressed_data[..])?)
240 }
241
242 fn save_index(&mut self, index_id: &str, data: &[u8]) -> Result<()> {
243 let path = self.get_index_path(index_id);
244 let compressed_data = zstd::encode_all(data, self.compression_level)?;
245 std::fs::write(&path, compressed_data)?;
246 Ok(())
247 }
248
249 fn delete_index(&mut self, index_id: &str) -> Result<()> {
250 let path = self.get_index_path(index_id);
251 std::fs::remove_file(&path)?;
252 Ok(())
253 }
254
255 fn exists(&self, index_id: &str) -> bool {
256 self.get_index_path(index_id).exists()
257 }
258
259 fn get_size(&self, index_id: &str) -> Result<u64> {
260 let path = self.get_index_path(index_id);
261 Ok(std::fs::metadata(&path)?.len())
262 }
263
264 fn list_indices(&self) -> Result<Vec<String>> {
265 let mut indices = Vec::new();
266 for entry in std::fs::read_dir(&self.base_path)? {
267 let entry = entry?;
268 if let Some(filename) = entry.file_name().to_str() {
269 if filename.ends_with(".idx.zst") {
270 let index_id = filename.trim_end_matches(".idx.zst").to_string();
271 indices.push(index_id);
272 }
273 }
274 }
275 Ok(indices)
276 }
277
278 fn backend_type(&self) -> &'static str {
279 "ColdTier (Compressed Disk)"
280 }
281}
282
283#[cfg(test)]
284mod tests {
285 use super::*;
286
287 #[test]
288 fn test_hot_tier_storage() {
289 let mut storage = HotTierStorage::new();
290
291 let data = vec![1, 2, 3, 4, 5];
292 storage.save_index("test", &data).unwrap();
293
294 assert!(storage.exists("test"));
295 assert_eq!(storage.get_size("test").unwrap(), 5);
296
297 let loaded = storage.load_index("test").unwrap();
298 assert_eq!(loaded, data);
299
300 storage.delete_index("test").unwrap();
301 assert!(!storage.exists("test"));
302 }
303
304 #[test]
305 fn test_warm_tier_storage() {
306 use std::env;
307 let temp_dir = env::temp_dir().join("oxirs_warm_tier_test");
308 std::fs::create_dir_all(&temp_dir).unwrap();
309
310 let mut storage = WarmTierStorage::new(&temp_dir, true, 6).unwrap();
311
312 let data = vec![1, 2, 3, 4, 5];
313 storage.save_index("test", &data).unwrap();
314
315 assert!(storage.exists("test"));
316
317 let loaded = storage.load_index("test").unwrap();
318 assert_eq!(loaded, data);
319
320 storage.delete_index("test").unwrap();
321 assert!(!storage.exists("test"));
322
323 std::fs::remove_dir_all(&temp_dir).ok();
324 }
325
326 #[test]
327 fn test_cold_tier_storage() {
328 use std::env;
329 let temp_dir = env::temp_dir().join("oxirs_cold_tier_test");
330 std::fs::create_dir_all(&temp_dir).unwrap();
331
332 let mut storage = ColdTierStorage::new(&temp_dir, 19).unwrap();
333
334 let data = vec![1, 2, 3, 4, 5];
335 storage.save_index("test", &data).unwrap();
336
337 assert!(storage.exists("test"));
338
339 let loaded = storage.load_index("test").unwrap();
340 assert_eq!(loaded, data);
341
342 storage.delete_index("test").unwrap();
343 assert!(!storage.exists("test"));
344
345 std::fs::remove_dir_all(&temp_dir).ok();
346 }
347}