Skip to main content

amaters_core/storage/
lsm_storage.rs

1//! LSM-Tree async storage wrapper
2//!
3//! This module provides an async wrapper around the synchronous LSM-Tree implementation.
4//! All blocking operations are executed on a dedicated thread pool via spawn_blocking.
5
6use crate::error::{AmateRSError, ErrorContext, Result};
7use crate::storage::secondary_index::{IndexConfig, IndexExtractor, IndexManager, IndexedField};
8use crate::storage::{LsmTree, LsmTreeConfig};
9use crate::traits::StorageEngine;
10use crate::types::{CipherBlob, Key};
11use async_trait::async_trait;
12use std::sync::Arc;
13use tokio::sync::Mutex;
14
15/// Async wrapper around LSM-Tree storage engine
16///
17/// This wrapper makes the synchronous LSM-Tree usable in async contexts
18/// by running CPU-intensive operations in a blocking thread pool.
19#[derive(Clone)]
20pub struct LsmTreeStorage {
21    /// Inner LSM-Tree wrapped in Arc for thread-safe sharing
22    inner: Arc<LsmTree>,
23    /// Mutex for atomic_update operations
24    update_lock: Arc<Mutex<()>>,
25    /// Optional secondary index manager for automatic index maintenance.
26    index_manager: Option<Arc<IndexManager>>,
27    /// Optional extractor used to derive indexed fields from stored records.
28    index_extractor: Option<Arc<dyn IndexExtractor>>,
29}
30
31impl LsmTreeStorage {
32    /// Create a new LSM-Tree storage with default configuration
33    pub fn new<P: AsRef<std::path::Path>>(data_dir: P) -> Result<Self> {
34        let inner = LsmTree::new(data_dir)?;
35        Ok(Self {
36            inner: Arc::new(inner),
37            update_lock: Arc::new(Mutex::new(())),
38            index_manager: None,
39            index_extractor: None,
40        })
41    }
42
43    /// Create a new LSM-Tree storage with custom configuration
44    pub fn with_config(config: LsmTreeConfig) -> Result<Self> {
45        let inner = LsmTree::with_config(config)?;
46        Ok(Self {
47            inner: Arc::new(inner),
48            update_lock: Arc::new(Mutex::new(())),
49            index_manager: None,
50            index_extractor: None,
51        })
52    }
53
54    /// Attach a secondary index manager for automatic index maintenance.
55    ///
56    /// Must be called before the first `put`/`delete` if index tracking is desired.
57    pub fn with_index_manager(mut self, manager: Arc<IndexManager>) -> Self {
58        self.index_manager = Some(manager);
59        self
60    }
61
62    /// Attach an index extractor for automatic index maintenance.
63    ///
64    /// The extractor is called on every `put`/`delete` to derive the set of
65    /// indexed fields for the affected record.
66    pub fn with_index_extractor(mut self, extractor: Arc<dyn IndexExtractor>) -> Self {
67        self.index_extractor = Some(extractor);
68        self
69    }
70
71    /// Register a secondary index definition.
72    ///
73    /// Requires an attached index manager (see [`Self::with_index_manager`]).
74    pub fn register_index(&self, config: IndexConfig) -> Result<()> {
75        self.index_manager
76            .as_ref()
77            .ok_or_else(|| {
78                AmateRSError::ValidationError(ErrorContext::new(
79                    "No index manager attached; call with_index_manager() first",
80                ))
81            })
82            .and_then(|m| m.create_index(config))
83    }
84
85    /// Access the attached index manager for queries.
86    pub fn index_manager(&self) -> Option<&Arc<IndexManager>> {
87        self.index_manager.as_ref()
88    }
89
90    /// Inner put implementation — writes directly to the LSM-Tree without index updates.
91    async fn put_inner(&self, key: Key, value: CipherBlob) -> Result<()> {
92        let inner = self.inner.clone();
93        tokio::task::spawn_blocking(move || inner.put(key, value))
94            .await
95            .map_err(|e| {
96                AmateRSError::IoError(ErrorContext::new(format!("Task join error: {}", e)))
97            })?
98    }
99
100    /// Inner delete implementation — removes from the LSM-Tree without index updates.
101    async fn delete_inner(&self, key: Key) -> Result<()> {
102        let inner = self.inner.clone();
103        tokio::task::spawn_blocking(move || inner.delete(key))
104            .await
105            .map_err(|e| {
106                AmateRSError::IoError(ErrorContext::new(format!("Task join error: {}", e)))
107            })?
108    }
109
110    /// Validate unique constraints for the fields about to be written.
111    ///
112    /// Delegates to [`IndexManager::check_unique_for_fields`], which inspects
113    /// every unique index that matches the supplied fields and returns a
114    /// `ValidationError` if any conflict is detected before the write occurs.
115    fn validate_unique_constraints(
116        &self,
117        mgr: &IndexManager,
118        key: &Key,
119        new_fields: &[IndexedField],
120    ) -> Result<()> {
121        mgr.check_unique_for_fields(key, new_fields)
122    }
123
124    /// Get statistics from the underlying LSM-Tree
125    pub fn stats(&self) -> crate::storage::LsmTreeStats {
126        self.inner.stats()
127    }
128
129    /// Get level information
130    pub fn level_info(&self, level: usize) -> Option<crate::storage::LevelInfo> {
131        self.inner.level_info(level)
132    }
133
134    /// Get all levels information
135    pub fn all_levels_info(&self) -> Vec<crate::storage::LevelInfo> {
136        self.inner.all_levels_info()
137    }
138}
139
140#[async_trait]
141impl StorageEngine for LsmTreeStorage {
142    async fn put(&self, key: &Key, value: &CipherBlob) -> Result<()> {
143        // Integrity check is always performed first, before acquiring any lock.
144        value.verify_integrity()?;
145
146        if let (Some(mgr), Some(ext)) = (&self.index_manager, &self.index_extractor) {
147            // Hold the update lock for the entire read-extract-write-index sequence
148            // so no concurrent put/delete can interleave and corrupt the indexes.
149            let _guard = self.update_lock.lock().await;
150
151            // Read the current value so we know which index entries to remove.
152            let old_fields = match self.get(key).await? {
153                Some(old_blob) => ext.extract(key, &old_blob),
154                None => Vec::new(),
155            };
156            let new_fields = ext.extract(key, value);
157
158            // Validate unique constraints before writing (pre-flight check).
159            self.validate_unique_constraints(mgr, key, &new_fields)?;
160
161            // Write data to the LSM-Tree.
162            self.put_inner(key.clone(), value.clone()).await?;
163
164            // Update indexes to reflect the change.
165            mgr.apply_extracted(key, &old_fields, &new_fields)?;
166
167            Ok(())
168        } else {
169            self.put_inner(key.clone(), value.clone()).await
170        }
171    }
172
173    async fn get(&self, key: &Key) -> Result<Option<CipherBlob>> {
174        let inner = self.inner.clone();
175        let key = key.clone();
176
177        tokio::task::spawn_blocking(move || inner.get(&key))
178            .await
179            .map_err(|e| {
180                AmateRSError::IoError(ErrorContext::new(format!("Task join error: {}", e)))
181            })?
182    }
183
184    async fn atomic_update<F>(&self, key: &Key, f: F) -> Result<()>
185    where
186        F: Fn(&CipherBlob) -> Result<CipherBlob> + Send + Sync,
187    {
188        // Use lock to ensure atomicity across async calls.
189        // Note: put() will also try to acquire update_lock, but since we call
190        // put_inner() directly here the lock is only held once.
191        let _lock = self.update_lock.lock().await;
192
193        // Read current value.
194        let current = self.get(key).await?;
195        let old_value = current.unwrap_or_else(|| CipherBlob::new(Vec::new()));
196
197        // Apply update function.
198        let new_value = f(&old_value)?;
199        new_value.verify_integrity()?;
200
201        // Write and update indexes while still holding the lock.
202        if let (Some(mgr), Some(ext)) = (&self.index_manager, &self.index_extractor) {
203            let old_fields = ext.extract(key, &old_value);
204            let new_fields = ext.extract(key, &new_value);
205            self.validate_unique_constraints(mgr, key, &new_fields)?;
206            self.put_inner(key.clone(), new_value).await?;
207            mgr.apply_extracted(key, &old_fields, &new_fields)?;
208        } else {
209            self.put_inner(key.clone(), new_value).await?;
210        }
211
212        Ok(())
213    }
214
215    async fn delete(&self, key: &Key) -> Result<()> {
216        if let (Some(mgr), Some(ext)) = (&self.index_manager, &self.index_extractor) {
217            // Hold the update lock for the read-delete-index sequence.
218            let _guard = self.update_lock.lock().await;
219
220            // Read the current value so we know which index entries to clean up.
221            let old_fields = match self.get(key).await? {
222                Some(old_blob) => ext.extract(key, &old_blob),
223                None => Vec::new(),
224            };
225
226            // Delete from storage.
227            self.delete_inner(key.clone()).await?;
228
229            // Remove stale index entries.
230            mgr.apply_extracted(key, &old_fields, &[])?;
231
232            Ok(())
233        } else {
234            self.delete_inner(key.clone()).await
235        }
236    }
237
238    async fn range(&self, start: &Key, end: &Key) -> Result<Vec<(Key, CipherBlob)>> {
239        let inner = self.inner.clone();
240        let start = start.clone();
241        let end = end.clone();
242
243        tokio::task::spawn_blocking(move || inner.range(&start, &end))
244            .await
245            .map_err(|e| {
246                AmateRSError::IoError(ErrorContext::new(format!("Task join error: {}", e)))
247            })?
248    }
249
250    async fn keys(&self) -> Result<Vec<Key>> {
251        let inner = self.inner.clone();
252
253        tokio::task::spawn_blocking(move || inner.keys())
254            .await
255            .map_err(|e| {
256                AmateRSError::IoError(ErrorContext::new(format!("Task join error: {}", e)))
257            })?
258    }
259
260    async fn flush(&self) -> Result<()> {
261        let inner = self.inner.clone();
262
263        tokio::task::spawn_blocking(move || inner.flush())
264            .await
265            .map_err(|e| {
266                AmateRSError::IoError(ErrorContext::new(format!("Task join error: {}", e)))
267            })?
268    }
269
270    async fn close(&self) -> Result<()> {
271        let inner = self.inner.clone();
272
273        tokio::task::spawn_blocking(move || inner.close())
274            .await
275            .map_err(|e| {
276                AmateRSError::IoError(ErrorContext::new(format!("Task join error: {}", e)))
277            })?
278    }
279}
280
281#[cfg(test)]
282mod prop_tests {
283    use super::*;
284    use proptest::prelude::*;
285    use tempfile::TempDir;
286
287    // Strategy: printable ASCII keys, 1–20 chars.
288    fn arb_key() -> impl Strategy<Value = Key> {
289        "[a-zA-Z0-9_]{1,20}".prop_map(|s| Key::from_str(&s))
290    }
291
292    // Strategy: arbitrary cipher blobs, 1–256 bytes.
293    fn arb_blob() -> impl Strategy<Value = CipherBlob> {
294        prop::collection::vec(any::<u8>(), 1..=256).prop_map(CipherBlob::new)
295    }
296
297    proptest! {
298        #[ignore = "slow: disk I/O per proptest case, >180s; run manually with cargo test -- --ignored"]
299        #[test]
300        fn prop_put_get_consistency(key in arb_key(), val in arb_blob()) {
301            // Property: put(k,v) → get(k) == Some(v)
302            let dir = TempDir::new().expect("create tempdir");
303            let rt = tokio::runtime::Runtime::new().expect("create runtime");
304            rt.block_on(async {
305                let storage = LsmTreeStorage::new(dir.path()).expect("create storage");
306                storage.put(&key, &val).await.expect("put");
307                let got = storage.get(&key).await.expect("get");
308                prop_assert!(got.is_some(), "get after put must return Some");
309                let got_val = got.expect("got is some");
310                prop_assert_eq!(
311                    got_val.as_bytes(),
312                    val.as_bytes(),
313                    "retrieved value must equal stored value"
314                );
315                Ok::<(), proptest::test_runner::TestCaseError>(())
316            })?;
317        }
318
319        #[ignore = "slow: disk I/O per proptest case, >180s; run manually with cargo test -- --ignored"]
320        #[test]
321        fn prop_delete_removes_key(key in arb_key(), val in arb_blob()) {
322            // Property: put(k,v); delete(k) → get(k) == None
323            let dir = TempDir::new().expect("create tempdir");
324            let rt = tokio::runtime::Runtime::new().expect("create runtime");
325            rt.block_on(async {
326                let storage = LsmTreeStorage::new(dir.path()).expect("create storage");
327                storage.put(&key, &val).await.expect("put");
328                storage.delete(&key).await.expect("delete");
329                let got = storage.get(&key).await.expect("get after delete");
330                prop_assert!(
331                    got.is_none(),
332                    "key must be absent after delete, got {:?}",
333                    got
334                );
335                Ok::<(), proptest::test_runner::TestCaseError>(())
336            })?;
337        }
338
339        #[ignore = "slow: disk I/O per proptest case, >180s; run manually with cargo test -- --ignored"]
340        #[test]
341        fn prop_overwrite_returns_latest(key in arb_key(), v1 in arb_blob(), v2 in arb_blob()) {
342            // Property: put(k,v1); put(k,v2) → get(k) == Some(v2)
343            let dir = TempDir::new().expect("create tempdir");
344            let rt = tokio::runtime::Runtime::new().expect("create runtime");
345            rt.block_on(async {
346                let storage = LsmTreeStorage::new(dir.path()).expect("create storage");
347                storage.put(&key, &v1).await.expect("put v1");
348                storage.put(&key, &v2).await.expect("put v2");
349                let got = storage.get(&key).await.expect("get after overwrite");
350                prop_assert!(got.is_some(), "get after two puts must return Some");
351                let got_val = got.expect("got is some");
352                prop_assert_eq!(
353                    got_val.as_bytes(),
354                    v2.as_bytes(),
355                    "most recent value must win on overwrite"
356                );
357                Ok::<(), proptest::test_runner::TestCaseError>(())
358            })?;
359        }
360
361        #[ignore = "slow: disk I/O per proptest case, >180s; run manually with cargo test -- --ignored"]
362        #[test]
363        fn prop_range_returns_sorted_results(
364            keys in prop::collection::vec(arb_key(), 2..=10),
365            val in arb_blob()
366        ) {
367            // Property: range scan returns keys in lexicographic order.
368            let dir = TempDir::new().expect("create tempdir");
369            let rt = tokio::runtime::Runtime::new().expect("create runtime");
370            rt.block_on(async {
371                let storage = LsmTreeStorage::new(dir.path()).expect("create storage");
372                for k in &keys {
373                    storage.put(k, &val).await.expect("put");
374                }
375                // Wide range scan: from smallest to largest possible key.
376                let start = Key::from_str("\x00");
377                let end = Key::from_slice(&[0xFF; 32]);
378                let all = storage.range(&start, &end).await.expect("range scan");
379                let result_keys: Vec<Key> = all.into_iter().map(|(k, _)| k).collect();
380                // Verify strict lexicographic non-decreasing order.
381                for w in result_keys.windows(2) {
382                    prop_assert!(
383                        w[0] <= w[1],
384                        "range results not sorted: {:?} > {:?}",
385                        w[0],
386                        w[1]
387                    );
388                }
389                Ok::<(), proptest::test_runner::TestCaseError>(())
390            })?;
391        }
392
393        #[ignore = "slow: disk I/O per proptest case, >180s; run manually with cargo test -- --ignored"]
394        #[test]
395        fn prop_contains_consistent_with_get(key in arb_key(), val in arb_blob()) {
396            // Property: contains(k) ↔ get(k).is_some(), both before and after put.
397            let dir = TempDir::new().expect("create tempdir");
398            let rt = tokio::runtime::Runtime::new().expect("create runtime");
399            rt.block_on(async {
400                let storage = LsmTreeStorage::new(dir.path()).expect("create storage");
401                // Before put: both must agree the key is absent.
402                let c_before = storage.contains(&key).await.expect("contains before put");
403                let g_before = storage.get(&key).await.expect("get before put");
404                prop_assert_eq!(
405                    c_before,
406                    g_before.is_some(),
407                    "contains/get must agree before put"
408                );
409                // After put: both must agree the key is present.
410                storage.put(&key, &val).await.expect("put");
411                let c_after = storage.contains(&key).await.expect("contains after put");
412                let g_after = storage.get(&key).await.expect("get after put");
413                prop_assert_eq!(
414                    c_after,
415                    g_after.is_some(),
416                    "contains/get must agree after put"
417                );
418                Ok::<(), proptest::test_runner::TestCaseError>(())
419            })?;
420        }
421    }
422}
423
424#[cfg(test)]
425mod tests {
426    use super::*;
427    use crate::storage::secondary_index::{IndexConfig, IndexType};
428    use std::env;
429
430    // -------------------------------------------------------------------------
431    // Test extractor: indexes the raw blob bytes under ("test_col", "data").
432    // -------------------------------------------------------------------------
433    #[derive(Debug)]
434    struct TestExtractor;
435
436    impl IndexExtractor for TestExtractor {
437        fn extract(&self, _key: &Key, value: &CipherBlob) -> Vec<IndexedField> {
438            vec![IndexedField {
439                collection: "test_col".to_string(),
440                field_name: "data".to_string(),
441                value: value.as_bytes().to_vec(),
442            }]
443        }
444    }
445
446    /// Build an `LsmTreeStorage` with a `TestExtractor` and one BTree index
447    /// for `("test_col", "data")` in a fresh temporary directory.
448    ///
449    /// Uses an explicit `wal_dir` under the same temp subtree to avoid
450    /// concurrent-test interference with the default relative `./wal` path.
451    fn make_indexed_storage(subdir: &str) -> Result<LsmTreeStorage> {
452        let dir = env::temp_dir().join(subdir);
453        if dir.exists() {
454            std::fs::remove_dir_all(&dir).ok();
455        }
456        std::fs::create_dir_all(&dir)?;
457
458        let mgr = Arc::new(IndexManager::new());
459        mgr.create_index(IndexConfig {
460            name: "idx_test_col_data".to_string(),
461            collection: "test_col".to_string(),
462            field_name: "data".to_string(),
463            index_type: IndexType::BTree,
464            unique: false,
465        })?;
466
467        // Use a wal subdirectory under `dir` so that concurrent tests do not
468        // share the default relative `./wal` path, which causes race conditions
469        // under nextest's parallel test execution.
470        let config = LsmTreeConfig {
471            data_dir: dir.clone(),
472            wal_dir: dir.join("wal"),
473            ..Default::default()
474        };
475        let storage = LsmTreeStorage::with_config(config)?
476            .with_index_manager(mgr)
477            .with_index_extractor(Arc::new(TestExtractor));
478
479        Ok(storage)
480    }
481
482    fn lookup_count(storage: &LsmTreeStorage, value: &[u8]) -> usize {
483        storage
484            .index_manager()
485            .and_then(|m| m.with_index("idx_test_col_data", |idx| idx.lookup(value).len()))
486            .unwrap_or(0)
487    }
488
489    #[tokio::test]
490    async fn test_auto_index_on_put() -> Result<()> {
491        let storage = make_indexed_storage("lsm_auto_idx_put")?;
492
493        let key = Key::from_str("rec_1");
494        let value = CipherBlob::new(b"alice".to_vec());
495        storage.put(&key, &value).await?;
496
497        // Index lookup must find the key WITHOUT calling update_indexes manually.
498        assert_eq!(
499            lookup_count(&storage, b"alice"),
500            1,
501            "index should contain one entry after put"
502        );
503
504        std::fs::remove_dir_all(env::temp_dir().join("lsm_auto_idx_put")).ok();
505        Ok(())
506    }
507
508    #[tokio::test]
509    async fn test_auto_index_updates_on_overwrite() -> Result<()> {
510        let storage = make_indexed_storage("lsm_auto_idx_overwrite")?;
511
512        let key = Key::from_str("rec_1");
513
514        // First write: value = "alice"
515        storage
516            .put(&key, &CipherBlob::new(b"alice".to_vec()))
517            .await?;
518        assert_eq!(lookup_count(&storage, b"alice"), 1);
519
520        // Overwrite with "bob"
521        storage.put(&key, &CipherBlob::new(b"bob".to_vec())).await?;
522
523        assert_eq!(
524            lookup_count(&storage, b"alice"),
525            0,
526            "old value index entry should be removed on overwrite"
527        );
528        assert_eq!(
529            lookup_count(&storage, b"bob"),
530            1,
531            "new value index entry should be present after overwrite"
532        );
533
534        std::fs::remove_dir_all(env::temp_dir().join("lsm_auto_idx_overwrite")).ok();
535        Ok(())
536    }
537
538    #[tokio::test]
539    async fn test_auto_index_on_delete() -> Result<()> {
540        let storage = make_indexed_storage("lsm_auto_idx_delete")?;
541
542        let key = Key::from_str("rec_1");
543        storage
544            .put(&key, &CipherBlob::new(b"alice".to_vec()))
545            .await?;
546        assert_eq!(lookup_count(&storage, b"alice"), 1);
547
548        // Delete the record
549        storage.delete(&key).await?;
550
551        assert_eq!(
552            lookup_count(&storage, b"alice"),
553            0,
554            "index entry should be removed on delete"
555        );
556
557        std::fs::remove_dir_all(env::temp_dir().join("lsm_auto_idx_delete")).ok();
558        Ok(())
559    }
560
561    #[tokio::test]
562    async fn test_no_index_manager_noop() -> Result<()> {
563        // LsmTreeStorage without an index manager: put/delete should still work normally.
564        let dir = env::temp_dir().join("lsm_no_index_mgr");
565        if dir.exists() {
566            std::fs::remove_dir_all(&dir).ok();
567        }
568        std::fs::create_dir_all(&dir).ok();
569
570        let storage = LsmTreeStorage::new(&dir)?;
571
572        let key = Key::from_str("k");
573        let value = CipherBlob::new(b"v".to_vec());
574
575        storage.put(&key, &value).await?;
576        assert_eq!(storage.get(&key).await?, Some(value));
577
578        storage.delete(&key).await?;
579        assert_eq!(storage.get(&key).await?, None);
580
581        std::fs::remove_dir_all(&dir).ok();
582        Ok(())
583    }
584
585    #[tokio::test]
586    async fn test_lsm_storage_basic() -> Result<()> {
587        let dir = env::temp_dir().join("test_lsm_storage_basic");
588        if dir.exists() {
589            std::fs::remove_dir_all(&dir).ok();
590        }
591        std::fs::create_dir_all(&dir).ok();
592
593        let storage = LsmTreeStorage::new(&dir)?;
594
595        // Put
596        let key = Key::from_str("test_key");
597        let value = CipherBlob::new(vec![1, 2, 3, 4, 5]);
598        storage.put(&key, &value).await?;
599
600        // Get
601        let retrieved = storage.get(&key).await?;
602        assert_eq!(retrieved, Some(value.clone()));
603
604        // Delete
605        storage.delete(&key).await?;
606        let retrieved = storage.get(&key).await?;
607        assert_eq!(retrieved, None);
608
609        // Cleanup
610        std::fs::remove_dir_all(&dir).ok();
611        Ok(())
612    }
613
614    #[tokio::test]
615    async fn test_lsm_storage_range() -> Result<()> {
616        let dir = env::temp_dir().join("test_lsm_storage_range");
617        if dir.exists() {
618            std::fs::remove_dir_all(&dir).ok();
619        }
620        std::fs::create_dir_all(&dir).ok();
621
622        let storage = LsmTreeStorage::new(&dir)?;
623
624        // Insert keys
625        for i in 0..10 {
626            let key = Key::from_str(&format!("key_{:03}", i));
627            let value = CipherBlob::new(vec![i as u8]);
628            storage.put(&key, &value).await?;
629        }
630
631        // Range scan
632        let start = Key::from_str("key_003");
633        let end = Key::from_str("key_007");
634        let results = storage.range(&start, &end).await?;
635
636        assert!(!results.is_empty());
637
638        // Cleanup
639        std::fs::remove_dir_all(&dir).ok();
640        Ok(())
641    }
642
643    #[tokio::test]
644    async fn test_lsm_storage_atomic_update() -> Result<()> {
645        let dir = env::temp_dir().join("test_lsm_storage_atomic");
646        if dir.exists() {
647            std::fs::remove_dir_all(&dir).ok();
648        }
649        std::fs::create_dir_all(&dir).ok();
650
651        let storage = LsmTreeStorage::new(&dir)?;
652        let key = Key::from_str("counter");
653        let initial = CipherBlob::new(vec![0]);
654
655        storage.put(&key, &initial).await?;
656
657        // Atomic increment
658        storage
659            .atomic_update(&key, |old| {
660                let mut data = old.to_vec();
661                if !data.is_empty() {
662                    data[0] += 1;
663                }
664                Ok(CipherBlob::new(data))
665            })
666            .await?;
667
668        let result = storage.get(&key).await?;
669        assert_eq!(
670            result
671                .ok_or_else(|| AmateRSError::KeyNotFound(ErrorContext::new(
672                    "Key not found".to_string()
673                )))?
674                .as_bytes()[0],
675            1
676        );
677
678        // Cleanup
679        std::fs::remove_dir_all(&dir).ok();
680        Ok(())
681    }
682
683    #[tokio::test]
684    async fn test_lsm_storage_keys() -> Result<()> {
685        let dir = env::temp_dir().join("test_lsm_storage_keys");
686        if dir.exists() {
687            std::fs::remove_dir_all(&dir).ok();
688        }
689        std::fs::create_dir_all(&dir).ok();
690
691        let storage = LsmTreeStorage::new(&dir)?;
692
693        // Insert keys
694        for i in 0..5 {
695            let key = Key::from_str(&format!("key_{}", i));
696            let value = CipherBlob::new(vec![i as u8]);
697            storage.put(&key, &value).await?;
698        }
699
700        // Get all keys
701        let keys = storage.keys().await?;
702        assert_eq!(keys.len(), 5);
703
704        // Cleanup
705        std::fs::remove_dir_all(&dir).ok();
706        Ok(())
707    }
708
709    #[tokio::test]
710    async fn test_lsm_storage_flush_and_close() -> Result<()> {
711        let dir = env::temp_dir().join("test_lsm_storage_flush");
712        if dir.exists() {
713            std::fs::remove_dir_all(&dir).ok();
714        }
715        std::fs::create_dir_all(&dir).ok();
716
717        let storage = LsmTreeStorage::new(&dir)?;
718
719        // Write data
720        let key = Key::from_str("test_key");
721        let value = CipherBlob::new(vec![1, 2, 3]);
722        storage.put(&key, &value).await?;
723
724        // Flush
725        storage.flush().await?;
726
727        // Close
728        storage.close().await?;
729
730        // Cleanup
731        std::fs::remove_dir_all(&dir).ok();
732        Ok(())
733    }
734}