amaters_core/storage/
lsm_storage.rs

1//! LSM-Tree async storage wrapper
2//!
3//! This module provides an async wrapper around the synchronous LSM-Tree implementation.
4//! All blocking operations are executed on a dedicated thread pool via spawn_blocking.
5
6use crate::error::{AmateRSError, ErrorContext, Result};
7use crate::storage::{LsmTree, LsmTreeConfig};
8use crate::traits::StorageEngine;
9use crate::types::{CipherBlob, Key};
10use async_trait::async_trait;
11use std::sync::Arc;
12use tokio::sync::Mutex;
13
14/// Async wrapper around LSM-Tree storage engine
15///
16/// This wrapper makes the synchronous LSM-Tree usable in async contexts
17/// by running CPU-intensive operations in a blocking thread pool.
18#[derive(Clone)]
19pub struct LsmTreeStorage {
20    /// Inner LSM-Tree wrapped in Arc for thread-safe sharing
21    inner: Arc<LsmTree>,
22    /// Mutex for atomic_update operations
23    update_lock: Arc<Mutex<()>>,
24}
25
26impl LsmTreeStorage {
27    /// Create a new LSM-Tree storage with default configuration
28    pub fn new<P: AsRef<std::path::Path>>(data_dir: P) -> Result<Self> {
29        let inner = LsmTree::new(data_dir)?;
30        Ok(Self {
31            inner: Arc::new(inner),
32            update_lock: Arc::new(Mutex::new(())),
33        })
34    }
35
36    /// Create a new LSM-Tree storage with custom configuration
37    pub fn with_config(config: LsmTreeConfig) -> Result<Self> {
38        let inner = LsmTree::with_config(config)?;
39        Ok(Self {
40            inner: Arc::new(inner),
41            update_lock: Arc::new(Mutex::new(())),
42        })
43    }
44
45    /// Get statistics from the underlying LSM-Tree
46    pub fn stats(&self) -> crate::storage::LsmTreeStats {
47        self.inner.stats()
48    }
49
50    /// Get level information
51    pub fn level_info(&self, level: usize) -> Option<crate::storage::LevelInfo> {
52        self.inner.level_info(level)
53    }
54
55    /// Get all levels information
56    pub fn all_levels_info(&self) -> Vec<crate::storage::LevelInfo> {
57        self.inner.all_levels_info()
58    }
59}
60
61#[async_trait]
62impl StorageEngine for LsmTreeStorage {
63    async fn put(&self, key: &Key, value: &CipherBlob) -> Result<()> {
64        // Verify integrity before storing
65        value.verify_integrity()?;
66
67        let inner = self.inner.clone();
68        let key = key.clone();
69        let value = value.clone();
70
71        tokio::task::spawn_blocking(move || inner.put(key, value))
72            .await
73            .map_err(|e| {
74                AmateRSError::IoError(ErrorContext::new(format!("Task join error: {}", e)))
75            })?
76    }
77
78    async fn get(&self, key: &Key) -> Result<Option<CipherBlob>> {
79        let inner = self.inner.clone();
80        let key = key.clone();
81
82        tokio::task::spawn_blocking(move || inner.get(&key))
83            .await
84            .map_err(|e| {
85                AmateRSError::IoError(ErrorContext::new(format!("Task join error: {}", e)))
86            })?
87    }
88
89    async fn atomic_update<F>(&self, key: &Key, f: F) -> Result<()>
90    where
91        F: Fn(&CipherBlob) -> Result<CipherBlob> + Send + Sync,
92    {
93        // Use lock to ensure atomicity across async calls
94        let _lock = self.update_lock.lock().await;
95
96        // Read current value
97        let current = self.get(key).await?;
98        let old_value = current.unwrap_or_else(|| CipherBlob::new(Vec::new()));
99
100        // Apply update function
101        let new_value = f(&old_value)?;
102        new_value.verify_integrity()?;
103
104        // Write new value
105        self.put(key, &new_value).await?;
106
107        Ok(())
108    }
109
110    async fn delete(&self, key: &Key) -> Result<()> {
111        let inner = self.inner.clone();
112        let key = key.clone();
113
114        tokio::task::spawn_blocking(move || inner.delete(key))
115            .await
116            .map_err(|e| {
117                AmateRSError::IoError(ErrorContext::new(format!("Task join error: {}", e)))
118            })?
119    }
120
121    async fn range(&self, start: &Key, end: &Key) -> Result<Vec<(Key, CipherBlob)>> {
122        let inner = self.inner.clone();
123        let start = start.clone();
124        let end = end.clone();
125
126        tokio::task::spawn_blocking(move || inner.range(&start, &end))
127            .await
128            .map_err(|e| {
129                AmateRSError::IoError(ErrorContext::new(format!("Task join error: {}", e)))
130            })?
131    }
132
133    async fn keys(&self) -> Result<Vec<Key>> {
134        let inner = self.inner.clone();
135
136        tokio::task::spawn_blocking(move || inner.keys())
137            .await
138            .map_err(|e| {
139                AmateRSError::IoError(ErrorContext::new(format!("Task join error: {}", e)))
140            })?
141    }
142
143    async fn flush(&self) -> Result<()> {
144        let inner = self.inner.clone();
145
146        tokio::task::spawn_blocking(move || inner.flush())
147            .await
148            .map_err(|e| {
149                AmateRSError::IoError(ErrorContext::new(format!("Task join error: {}", e)))
150            })?
151    }
152
153    async fn close(&self) -> Result<()> {
154        let inner = self.inner.clone();
155
156        tokio::task::spawn_blocking(move || inner.close())
157            .await
158            .map_err(|e| {
159                AmateRSError::IoError(ErrorContext::new(format!("Task join error: {}", e)))
160            })?
161    }
162}
163
164#[cfg(test)]
165mod tests {
166    use super::*;
167    use std::env;
168
169    #[tokio::test]
170    async fn test_lsm_storage_basic() -> Result<()> {
171        let dir = env::temp_dir().join("test_lsm_storage_basic");
172        if dir.exists() {
173            std::fs::remove_dir_all(&dir).ok();
174        }
175        std::fs::create_dir_all(&dir).ok();
176
177        let storage = LsmTreeStorage::new(&dir)?;
178
179        // Put
180        let key = Key::from_str("test_key");
181        let value = CipherBlob::new(vec![1, 2, 3, 4, 5]);
182        storage.put(&key, &value).await?;
183
184        // Get
185        let retrieved = storage.get(&key).await?;
186        assert_eq!(retrieved, Some(value.clone()));
187
188        // Delete
189        storage.delete(&key).await?;
190        let retrieved = storage.get(&key).await?;
191        assert_eq!(retrieved, None);
192
193        // Cleanup
194        std::fs::remove_dir_all(&dir).ok();
195        Ok(())
196    }
197
198    #[tokio::test]
199    async fn test_lsm_storage_range() -> Result<()> {
200        let dir = env::temp_dir().join("test_lsm_storage_range");
201        if dir.exists() {
202            std::fs::remove_dir_all(&dir).ok();
203        }
204        std::fs::create_dir_all(&dir).ok();
205
206        let storage = LsmTreeStorage::new(&dir)?;
207
208        // Insert keys
209        for i in 0..10 {
210            let key = Key::from_str(&format!("key_{:03}", i));
211            let value = CipherBlob::new(vec![i as u8]);
212            storage.put(&key, &value).await?;
213        }
214
215        // Range scan
216        let start = Key::from_str("key_003");
217        let end = Key::from_str("key_007");
218        let results = storage.range(&start, &end).await?;
219
220        assert!(!results.is_empty());
221
222        // Cleanup
223        std::fs::remove_dir_all(&dir).ok();
224        Ok(())
225    }
226
227    #[tokio::test]
228    async fn test_lsm_storage_atomic_update() -> Result<()> {
229        let dir = env::temp_dir().join("test_lsm_storage_atomic");
230        if dir.exists() {
231            std::fs::remove_dir_all(&dir).ok();
232        }
233        std::fs::create_dir_all(&dir).ok();
234
235        let storage = LsmTreeStorage::new(&dir)?;
236        let key = Key::from_str("counter");
237        let initial = CipherBlob::new(vec![0]);
238
239        storage.put(&key, &initial).await?;
240
241        // Atomic increment
242        storage
243            .atomic_update(&key, |old| {
244                let mut data = old.to_vec();
245                if !data.is_empty() {
246                    data[0] += 1;
247                }
248                Ok(CipherBlob::new(data))
249            })
250            .await?;
251
252        let result = storage.get(&key).await?;
253        assert_eq!(
254            result
255                .ok_or_else(|| AmateRSError::KeyNotFound(ErrorContext::new(
256                    "Key not found".to_string()
257                )))?
258                .as_bytes()[0],
259            1
260        );
261
262        // Cleanup
263        std::fs::remove_dir_all(&dir).ok();
264        Ok(())
265    }
266
267    #[tokio::test]
268    async fn test_lsm_storage_keys() -> Result<()> {
269        let dir = env::temp_dir().join("test_lsm_storage_keys");
270        if dir.exists() {
271            std::fs::remove_dir_all(&dir).ok();
272        }
273        std::fs::create_dir_all(&dir).ok();
274
275        let storage = LsmTreeStorage::new(&dir)?;
276
277        // Insert keys
278        for i in 0..5 {
279            let key = Key::from_str(&format!("key_{}", i));
280            let value = CipherBlob::new(vec![i as u8]);
281            storage.put(&key, &value).await?;
282        }
283
284        // Get all keys
285        let keys = storage.keys().await?;
286        assert_eq!(keys.len(), 5);
287
288        // Cleanup
289        std::fs::remove_dir_all(&dir).ok();
290        Ok(())
291    }
292
293    #[tokio::test]
294    async fn test_lsm_storage_flush_and_close() -> Result<()> {
295        let dir = env::temp_dir().join("test_lsm_storage_flush");
296        if dir.exists() {
297            std::fs::remove_dir_all(&dir).ok();
298        }
299        std::fs::create_dir_all(&dir).ok();
300
301        let storage = LsmTreeStorage::new(&dir)?;
302
303        // Write data
304        let key = Key::from_str("test_key");
305        let value = CipherBlob::new(vec![1, 2, 3]);
306        storage.put(&key, &value).await?;
307
308        // Flush
309        storage.flush().await?;
310
311        // Close
312        storage.close().await?;
313
314        // Cleanup
315        std::fs::remove_dir_all(&dir).ok();
316        Ok(())
317    }
318}