amaters_core/storage/
lsm_storage.rs1use crate::error::{AmateRSError, ErrorContext, Result};
7use crate::storage::{LsmTree, LsmTreeConfig};
8use crate::traits::StorageEngine;
9use crate::types::{CipherBlob, Key};
10use async_trait::async_trait;
11use std::sync::Arc;
12use tokio::sync::Mutex;
13
14#[derive(Clone)]
19pub struct LsmTreeStorage {
20 inner: Arc<LsmTree>,
22 update_lock: Arc<Mutex<()>>,
24}
25
26impl LsmTreeStorage {
27 pub fn new<P: AsRef<std::path::Path>>(data_dir: P) -> Result<Self> {
29 let inner = LsmTree::new(data_dir)?;
30 Ok(Self {
31 inner: Arc::new(inner),
32 update_lock: Arc::new(Mutex::new(())),
33 })
34 }
35
36 pub fn with_config(config: LsmTreeConfig) -> Result<Self> {
38 let inner = LsmTree::with_config(config)?;
39 Ok(Self {
40 inner: Arc::new(inner),
41 update_lock: Arc::new(Mutex::new(())),
42 })
43 }
44
45 pub fn stats(&self) -> crate::storage::LsmTreeStats {
47 self.inner.stats()
48 }
49
50 pub fn level_info(&self, level: usize) -> Option<crate::storage::LevelInfo> {
52 self.inner.level_info(level)
53 }
54
55 pub fn all_levels_info(&self) -> Vec<crate::storage::LevelInfo> {
57 self.inner.all_levels_info()
58 }
59}
60
61#[async_trait]
62impl StorageEngine for LsmTreeStorage {
63 async fn put(&self, key: &Key, value: &CipherBlob) -> Result<()> {
64 value.verify_integrity()?;
66
67 let inner = self.inner.clone();
68 let key = key.clone();
69 let value = value.clone();
70
71 tokio::task::spawn_blocking(move || inner.put(key, value))
72 .await
73 .map_err(|e| {
74 AmateRSError::IoError(ErrorContext::new(format!("Task join error: {}", e)))
75 })?
76 }
77
78 async fn get(&self, key: &Key) -> Result<Option<CipherBlob>> {
79 let inner = self.inner.clone();
80 let key = key.clone();
81
82 tokio::task::spawn_blocking(move || inner.get(&key))
83 .await
84 .map_err(|e| {
85 AmateRSError::IoError(ErrorContext::new(format!("Task join error: {}", e)))
86 })?
87 }
88
89 async fn atomic_update<F>(&self, key: &Key, f: F) -> Result<()>
90 where
91 F: Fn(&CipherBlob) -> Result<CipherBlob> + Send + Sync,
92 {
93 let _lock = self.update_lock.lock().await;
95
96 let current = self.get(key).await?;
98 let old_value = current.unwrap_or_else(|| CipherBlob::new(Vec::new()));
99
100 let new_value = f(&old_value)?;
102 new_value.verify_integrity()?;
103
104 self.put(key, &new_value).await?;
106
107 Ok(())
108 }
109
110 async fn delete(&self, key: &Key) -> Result<()> {
111 let inner = self.inner.clone();
112 let key = key.clone();
113
114 tokio::task::spawn_blocking(move || inner.delete(key))
115 .await
116 .map_err(|e| {
117 AmateRSError::IoError(ErrorContext::new(format!("Task join error: {}", e)))
118 })?
119 }
120
121 async fn range(&self, start: &Key, end: &Key) -> Result<Vec<(Key, CipherBlob)>> {
122 let inner = self.inner.clone();
123 let start = start.clone();
124 let end = end.clone();
125
126 tokio::task::spawn_blocking(move || inner.range(&start, &end))
127 .await
128 .map_err(|e| {
129 AmateRSError::IoError(ErrorContext::new(format!("Task join error: {}", e)))
130 })?
131 }
132
133 async fn keys(&self) -> Result<Vec<Key>> {
134 let inner = self.inner.clone();
135
136 tokio::task::spawn_blocking(move || inner.keys())
137 .await
138 .map_err(|e| {
139 AmateRSError::IoError(ErrorContext::new(format!("Task join error: {}", e)))
140 })?
141 }
142
143 async fn flush(&self) -> Result<()> {
144 let inner = self.inner.clone();
145
146 tokio::task::spawn_blocking(move || inner.flush())
147 .await
148 .map_err(|e| {
149 AmateRSError::IoError(ErrorContext::new(format!("Task join error: {}", e)))
150 })?
151 }
152
153 async fn close(&self) -> Result<()> {
154 let inner = self.inner.clone();
155
156 tokio::task::spawn_blocking(move || inner.close())
157 .await
158 .map_err(|e| {
159 AmateRSError::IoError(ErrorContext::new(format!("Task join error: {}", e)))
160 })?
161 }
162}
163
164#[cfg(test)]
165mod tests {
166 use super::*;
167 use std::env;
168
169 #[tokio::test]
170 async fn test_lsm_storage_basic() -> Result<()> {
171 let dir = env::temp_dir().join("test_lsm_storage_basic");
172 if dir.exists() {
173 std::fs::remove_dir_all(&dir).ok();
174 }
175 std::fs::create_dir_all(&dir).ok();
176
177 let storage = LsmTreeStorage::new(&dir)?;
178
179 let key = Key::from_str("test_key");
181 let value = CipherBlob::new(vec![1, 2, 3, 4, 5]);
182 storage.put(&key, &value).await?;
183
184 let retrieved = storage.get(&key).await?;
186 assert_eq!(retrieved, Some(value.clone()));
187
188 storage.delete(&key).await?;
190 let retrieved = storage.get(&key).await?;
191 assert_eq!(retrieved, None);
192
193 std::fs::remove_dir_all(&dir).ok();
195 Ok(())
196 }
197
198 #[tokio::test]
199 async fn test_lsm_storage_range() -> Result<()> {
200 let dir = env::temp_dir().join("test_lsm_storage_range");
201 if dir.exists() {
202 std::fs::remove_dir_all(&dir).ok();
203 }
204 std::fs::create_dir_all(&dir).ok();
205
206 let storage = LsmTreeStorage::new(&dir)?;
207
208 for i in 0..10 {
210 let key = Key::from_str(&format!("key_{:03}", i));
211 let value = CipherBlob::new(vec![i as u8]);
212 storage.put(&key, &value).await?;
213 }
214
215 let start = Key::from_str("key_003");
217 let end = Key::from_str("key_007");
218 let results = storage.range(&start, &end).await?;
219
220 assert!(!results.is_empty());
221
222 std::fs::remove_dir_all(&dir).ok();
224 Ok(())
225 }
226
227 #[tokio::test]
228 async fn test_lsm_storage_atomic_update() -> Result<()> {
229 let dir = env::temp_dir().join("test_lsm_storage_atomic");
230 if dir.exists() {
231 std::fs::remove_dir_all(&dir).ok();
232 }
233 std::fs::create_dir_all(&dir).ok();
234
235 let storage = LsmTreeStorage::new(&dir)?;
236 let key = Key::from_str("counter");
237 let initial = CipherBlob::new(vec![0]);
238
239 storage.put(&key, &initial).await?;
240
241 storage
243 .atomic_update(&key, |old| {
244 let mut data = old.to_vec();
245 if !data.is_empty() {
246 data[0] += 1;
247 }
248 Ok(CipherBlob::new(data))
249 })
250 .await?;
251
252 let result = storage.get(&key).await?;
253 assert_eq!(
254 result
255 .ok_or_else(|| AmateRSError::KeyNotFound(ErrorContext::new(
256 "Key not found".to_string()
257 )))?
258 .as_bytes()[0],
259 1
260 );
261
262 std::fs::remove_dir_all(&dir).ok();
264 Ok(())
265 }
266
267 #[tokio::test]
268 async fn test_lsm_storage_keys() -> Result<()> {
269 let dir = env::temp_dir().join("test_lsm_storage_keys");
270 if dir.exists() {
271 std::fs::remove_dir_all(&dir).ok();
272 }
273 std::fs::create_dir_all(&dir).ok();
274
275 let storage = LsmTreeStorage::new(&dir)?;
276
277 for i in 0..5 {
279 let key = Key::from_str(&format!("key_{}", i));
280 let value = CipherBlob::new(vec![i as u8]);
281 storage.put(&key, &value).await?;
282 }
283
284 let keys = storage.keys().await?;
286 assert_eq!(keys.len(), 5);
287
288 std::fs::remove_dir_all(&dir).ok();
290 Ok(())
291 }
292
293 #[tokio::test]
294 async fn test_lsm_storage_flush_and_close() -> Result<()> {
295 let dir = env::temp_dir().join("test_lsm_storage_flush");
296 if dir.exists() {
297 std::fs::remove_dir_all(&dir).ok();
298 }
299 std::fs::create_dir_all(&dir).ok();
300
301 let storage = LsmTreeStorage::new(&dir)?;
302
303 let key = Key::from_str("test_key");
305 let value = CipherBlob::new(vec![1, 2, 3]);
306 storage.put(&key, &value).await?;
307
308 storage.flush().await?;
310
311 storage.close().await?;
313
314 std::fs::remove_dir_all(&dir).ok();
316 Ok(())
317 }
318}