1use crate::error::{AmateRSError, ErrorContext, Result};
7use crate::storage::secondary_index::{IndexConfig, IndexExtractor, IndexManager, IndexedField};
8use crate::storage::{LsmTree, LsmTreeConfig};
9use crate::traits::StorageEngine;
10use crate::types::{CipherBlob, Key};
11use async_trait::async_trait;
12use std::sync::Arc;
13use tokio::sync::Mutex;
14
15#[derive(Clone)]
20pub struct LsmTreeStorage {
21 inner: Arc<LsmTree>,
23 update_lock: Arc<Mutex<()>>,
25 index_manager: Option<Arc<IndexManager>>,
27 index_extractor: Option<Arc<dyn IndexExtractor>>,
29}
30
31impl LsmTreeStorage {
32 pub fn new<P: AsRef<std::path::Path>>(data_dir: P) -> Result<Self> {
34 let inner = LsmTree::new(data_dir)?;
35 Ok(Self {
36 inner: Arc::new(inner),
37 update_lock: Arc::new(Mutex::new(())),
38 index_manager: None,
39 index_extractor: None,
40 })
41 }
42
43 pub fn with_config(config: LsmTreeConfig) -> Result<Self> {
45 let inner = LsmTree::with_config(config)?;
46 Ok(Self {
47 inner: Arc::new(inner),
48 update_lock: Arc::new(Mutex::new(())),
49 index_manager: None,
50 index_extractor: None,
51 })
52 }
53
54 pub fn with_index_manager(mut self, manager: Arc<IndexManager>) -> Self {
58 self.index_manager = Some(manager);
59 self
60 }
61
62 pub fn with_index_extractor(mut self, extractor: Arc<dyn IndexExtractor>) -> Self {
67 self.index_extractor = Some(extractor);
68 self
69 }
70
71 pub fn register_index(&self, config: IndexConfig) -> Result<()> {
75 self.index_manager
76 .as_ref()
77 .ok_or_else(|| {
78 AmateRSError::ValidationError(ErrorContext::new(
79 "No index manager attached; call with_index_manager() first",
80 ))
81 })
82 .and_then(|m| m.create_index(config))
83 }
84
85 pub fn index_manager(&self) -> Option<&Arc<IndexManager>> {
87 self.index_manager.as_ref()
88 }
89
90 async fn put_inner(&self, key: Key, value: CipherBlob) -> Result<()> {
92 let inner = self.inner.clone();
93 tokio::task::spawn_blocking(move || inner.put(key, value))
94 .await
95 .map_err(|e| {
96 AmateRSError::IoError(ErrorContext::new(format!("Task join error: {}", e)))
97 })?
98 }
99
100 async fn delete_inner(&self, key: Key) -> Result<()> {
102 let inner = self.inner.clone();
103 tokio::task::spawn_blocking(move || inner.delete(key))
104 .await
105 .map_err(|e| {
106 AmateRSError::IoError(ErrorContext::new(format!("Task join error: {}", e)))
107 })?
108 }
109
110 fn validate_unique_constraints(
116 &self,
117 mgr: &IndexManager,
118 key: &Key,
119 new_fields: &[IndexedField],
120 ) -> Result<()> {
121 mgr.check_unique_for_fields(key, new_fields)
122 }
123
124 pub fn stats(&self) -> crate::storage::LsmTreeStats {
126 self.inner.stats()
127 }
128
129 pub fn level_info(&self, level: usize) -> Option<crate::storage::LevelInfo> {
131 self.inner.level_info(level)
132 }
133
134 pub fn all_levels_info(&self) -> Vec<crate::storage::LevelInfo> {
136 self.inner.all_levels_info()
137 }
138}
139
140#[async_trait]
141impl StorageEngine for LsmTreeStorage {
142 async fn put(&self, key: &Key, value: &CipherBlob) -> Result<()> {
143 value.verify_integrity()?;
145
146 if let (Some(mgr), Some(ext)) = (&self.index_manager, &self.index_extractor) {
147 let _guard = self.update_lock.lock().await;
150
151 let old_fields = match self.get(key).await? {
153 Some(old_blob) => ext.extract(key, &old_blob),
154 None => Vec::new(),
155 };
156 let new_fields = ext.extract(key, value);
157
158 self.validate_unique_constraints(mgr, key, &new_fields)?;
160
161 self.put_inner(key.clone(), value.clone()).await?;
163
164 mgr.apply_extracted(key, &old_fields, &new_fields)?;
166
167 Ok(())
168 } else {
169 self.put_inner(key.clone(), value.clone()).await
170 }
171 }
172
173 async fn get(&self, key: &Key) -> Result<Option<CipherBlob>> {
174 let inner = self.inner.clone();
175 let key = key.clone();
176
177 tokio::task::spawn_blocking(move || inner.get(&key))
178 .await
179 .map_err(|e| {
180 AmateRSError::IoError(ErrorContext::new(format!("Task join error: {}", e)))
181 })?
182 }
183
184 async fn atomic_update<F>(&self, key: &Key, f: F) -> Result<()>
185 where
186 F: Fn(&CipherBlob) -> Result<CipherBlob> + Send + Sync,
187 {
188 let _lock = self.update_lock.lock().await;
192
193 let current = self.get(key).await?;
195 let old_value = current.unwrap_or_else(|| CipherBlob::new(Vec::new()));
196
197 let new_value = f(&old_value)?;
199 new_value.verify_integrity()?;
200
201 if let (Some(mgr), Some(ext)) = (&self.index_manager, &self.index_extractor) {
203 let old_fields = ext.extract(key, &old_value);
204 let new_fields = ext.extract(key, &new_value);
205 self.validate_unique_constraints(mgr, key, &new_fields)?;
206 self.put_inner(key.clone(), new_value).await?;
207 mgr.apply_extracted(key, &old_fields, &new_fields)?;
208 } else {
209 self.put_inner(key.clone(), new_value).await?;
210 }
211
212 Ok(())
213 }
214
215 async fn delete(&self, key: &Key) -> Result<()> {
216 if let (Some(mgr), Some(ext)) = (&self.index_manager, &self.index_extractor) {
217 let _guard = self.update_lock.lock().await;
219
220 let old_fields = match self.get(key).await? {
222 Some(old_blob) => ext.extract(key, &old_blob),
223 None => Vec::new(),
224 };
225
226 self.delete_inner(key.clone()).await?;
228
229 mgr.apply_extracted(key, &old_fields, &[])?;
231
232 Ok(())
233 } else {
234 self.delete_inner(key.clone()).await
235 }
236 }
237
238 async fn range(&self, start: &Key, end: &Key) -> Result<Vec<(Key, CipherBlob)>> {
239 let inner = self.inner.clone();
240 let start = start.clone();
241 let end = end.clone();
242
243 tokio::task::spawn_blocking(move || inner.range(&start, &end))
244 .await
245 .map_err(|e| {
246 AmateRSError::IoError(ErrorContext::new(format!("Task join error: {}", e)))
247 })?
248 }
249
250 async fn keys(&self) -> Result<Vec<Key>> {
251 let inner = self.inner.clone();
252
253 tokio::task::spawn_blocking(move || inner.keys())
254 .await
255 .map_err(|e| {
256 AmateRSError::IoError(ErrorContext::new(format!("Task join error: {}", e)))
257 })?
258 }
259
260 async fn flush(&self) -> Result<()> {
261 let inner = self.inner.clone();
262
263 tokio::task::spawn_blocking(move || inner.flush())
264 .await
265 .map_err(|e| {
266 AmateRSError::IoError(ErrorContext::new(format!("Task join error: {}", e)))
267 })?
268 }
269
270 async fn close(&self) -> Result<()> {
271 let inner = self.inner.clone();
272
273 tokio::task::spawn_blocking(move || inner.close())
274 .await
275 .map_err(|e| {
276 AmateRSError::IoError(ErrorContext::new(format!("Task join error: {}", e)))
277 })?
278 }
279}
280
281#[cfg(test)]
282mod prop_tests {
283 use super::*;
284 use proptest::prelude::*;
285 use tempfile::TempDir;
286
287 fn arb_key() -> impl Strategy<Value = Key> {
289 "[a-zA-Z0-9_]{1,20}".prop_map(|s| Key::from_str(&s))
290 }
291
292 fn arb_blob() -> impl Strategy<Value = CipherBlob> {
294 prop::collection::vec(any::<u8>(), 1..=256).prop_map(CipherBlob::new)
295 }
296
297 proptest! {
298 #[ignore = "slow: disk I/O per proptest case, >180s; run manually with cargo test -- --ignored"]
299 #[test]
300 fn prop_put_get_consistency(key in arb_key(), val in arb_blob()) {
301 let dir = TempDir::new().expect("create tempdir");
303 let rt = tokio::runtime::Runtime::new().expect("create runtime");
304 rt.block_on(async {
305 let storage = LsmTreeStorage::new(dir.path()).expect("create storage");
306 storage.put(&key, &val).await.expect("put");
307 let got = storage.get(&key).await.expect("get");
308 prop_assert!(got.is_some(), "get after put must return Some");
309 let got_val = got.expect("got is some");
310 prop_assert_eq!(
311 got_val.as_bytes(),
312 val.as_bytes(),
313 "retrieved value must equal stored value"
314 );
315 Ok::<(), proptest::test_runner::TestCaseError>(())
316 })?;
317 }
318
319 #[ignore = "slow: disk I/O per proptest case, >180s; run manually with cargo test -- --ignored"]
320 #[test]
321 fn prop_delete_removes_key(key in arb_key(), val in arb_blob()) {
322 let dir = TempDir::new().expect("create tempdir");
324 let rt = tokio::runtime::Runtime::new().expect("create runtime");
325 rt.block_on(async {
326 let storage = LsmTreeStorage::new(dir.path()).expect("create storage");
327 storage.put(&key, &val).await.expect("put");
328 storage.delete(&key).await.expect("delete");
329 let got = storage.get(&key).await.expect("get after delete");
330 prop_assert!(
331 got.is_none(),
332 "key must be absent after delete, got {:?}",
333 got
334 );
335 Ok::<(), proptest::test_runner::TestCaseError>(())
336 })?;
337 }
338
339 #[ignore = "slow: disk I/O per proptest case, >180s; run manually with cargo test -- --ignored"]
340 #[test]
341 fn prop_overwrite_returns_latest(key in arb_key(), v1 in arb_blob(), v2 in arb_blob()) {
342 let dir = TempDir::new().expect("create tempdir");
344 let rt = tokio::runtime::Runtime::new().expect("create runtime");
345 rt.block_on(async {
346 let storage = LsmTreeStorage::new(dir.path()).expect("create storage");
347 storage.put(&key, &v1).await.expect("put v1");
348 storage.put(&key, &v2).await.expect("put v2");
349 let got = storage.get(&key).await.expect("get after overwrite");
350 prop_assert!(got.is_some(), "get after two puts must return Some");
351 let got_val = got.expect("got is some");
352 prop_assert_eq!(
353 got_val.as_bytes(),
354 v2.as_bytes(),
355 "most recent value must win on overwrite"
356 );
357 Ok::<(), proptest::test_runner::TestCaseError>(())
358 })?;
359 }
360
361 #[ignore = "slow: disk I/O per proptest case, >180s; run manually with cargo test -- --ignored"]
362 #[test]
363 fn prop_range_returns_sorted_results(
364 keys in prop::collection::vec(arb_key(), 2..=10),
365 val in arb_blob()
366 ) {
367 let dir = TempDir::new().expect("create tempdir");
369 let rt = tokio::runtime::Runtime::new().expect("create runtime");
370 rt.block_on(async {
371 let storage = LsmTreeStorage::new(dir.path()).expect("create storage");
372 for k in &keys {
373 storage.put(k, &val).await.expect("put");
374 }
375 let start = Key::from_str("\x00");
377 let end = Key::from_slice(&[0xFF; 32]);
378 let all = storage.range(&start, &end).await.expect("range scan");
379 let result_keys: Vec<Key> = all.into_iter().map(|(k, _)| k).collect();
380 for w in result_keys.windows(2) {
382 prop_assert!(
383 w[0] <= w[1],
384 "range results not sorted: {:?} > {:?}",
385 w[0],
386 w[1]
387 );
388 }
389 Ok::<(), proptest::test_runner::TestCaseError>(())
390 })?;
391 }
392
393 #[ignore = "slow: disk I/O per proptest case, >180s; run manually with cargo test -- --ignored"]
394 #[test]
395 fn prop_contains_consistent_with_get(key in arb_key(), val in arb_blob()) {
396 let dir = TempDir::new().expect("create tempdir");
398 let rt = tokio::runtime::Runtime::new().expect("create runtime");
399 rt.block_on(async {
400 let storage = LsmTreeStorage::new(dir.path()).expect("create storage");
401 let c_before = storage.contains(&key).await.expect("contains before put");
403 let g_before = storage.get(&key).await.expect("get before put");
404 prop_assert_eq!(
405 c_before,
406 g_before.is_some(),
407 "contains/get must agree before put"
408 );
409 storage.put(&key, &val).await.expect("put");
411 let c_after = storage.contains(&key).await.expect("contains after put");
412 let g_after = storage.get(&key).await.expect("get after put");
413 prop_assert_eq!(
414 c_after,
415 g_after.is_some(),
416 "contains/get must agree after put"
417 );
418 Ok::<(), proptest::test_runner::TestCaseError>(())
419 })?;
420 }
421 }
422}
423
424#[cfg(test)]
425mod tests {
426 use super::*;
427 use crate::storage::secondary_index::{IndexConfig, IndexType};
428 use std::env;
429
430 #[derive(Debug)]
434 struct TestExtractor;
435
436 impl IndexExtractor for TestExtractor {
437 fn extract(&self, _key: &Key, value: &CipherBlob) -> Vec<IndexedField> {
438 vec![IndexedField {
439 collection: "test_col".to_string(),
440 field_name: "data".to_string(),
441 value: value.as_bytes().to_vec(),
442 }]
443 }
444 }
445
446 fn make_indexed_storage(subdir: &str) -> Result<LsmTreeStorage> {
452 let dir = env::temp_dir().join(subdir);
453 if dir.exists() {
454 std::fs::remove_dir_all(&dir).ok();
455 }
456 std::fs::create_dir_all(&dir)?;
457
458 let mgr = Arc::new(IndexManager::new());
459 mgr.create_index(IndexConfig {
460 name: "idx_test_col_data".to_string(),
461 collection: "test_col".to_string(),
462 field_name: "data".to_string(),
463 index_type: IndexType::BTree,
464 unique: false,
465 })?;
466
467 let config = LsmTreeConfig {
471 data_dir: dir.clone(),
472 wal_dir: dir.join("wal"),
473 ..Default::default()
474 };
475 let storage = LsmTreeStorage::with_config(config)?
476 .with_index_manager(mgr)
477 .with_index_extractor(Arc::new(TestExtractor));
478
479 Ok(storage)
480 }
481
482 fn lookup_count(storage: &LsmTreeStorage, value: &[u8]) -> usize {
483 storage
484 .index_manager()
485 .and_then(|m| m.with_index("idx_test_col_data", |idx| idx.lookup(value).len()))
486 .unwrap_or(0)
487 }
488
489 #[tokio::test]
490 async fn test_auto_index_on_put() -> Result<()> {
491 let storage = make_indexed_storage("lsm_auto_idx_put")?;
492
493 let key = Key::from_str("rec_1");
494 let value = CipherBlob::new(b"alice".to_vec());
495 storage.put(&key, &value).await?;
496
497 assert_eq!(
499 lookup_count(&storage, b"alice"),
500 1,
501 "index should contain one entry after put"
502 );
503
504 std::fs::remove_dir_all(env::temp_dir().join("lsm_auto_idx_put")).ok();
505 Ok(())
506 }
507
508 #[tokio::test]
509 async fn test_auto_index_updates_on_overwrite() -> Result<()> {
510 let storage = make_indexed_storage("lsm_auto_idx_overwrite")?;
511
512 let key = Key::from_str("rec_1");
513
514 storage
516 .put(&key, &CipherBlob::new(b"alice".to_vec()))
517 .await?;
518 assert_eq!(lookup_count(&storage, b"alice"), 1);
519
520 storage.put(&key, &CipherBlob::new(b"bob".to_vec())).await?;
522
523 assert_eq!(
524 lookup_count(&storage, b"alice"),
525 0,
526 "old value index entry should be removed on overwrite"
527 );
528 assert_eq!(
529 lookup_count(&storage, b"bob"),
530 1,
531 "new value index entry should be present after overwrite"
532 );
533
534 std::fs::remove_dir_all(env::temp_dir().join("lsm_auto_idx_overwrite")).ok();
535 Ok(())
536 }
537
538 #[tokio::test]
539 async fn test_auto_index_on_delete() -> Result<()> {
540 let storage = make_indexed_storage("lsm_auto_idx_delete")?;
541
542 let key = Key::from_str("rec_1");
543 storage
544 .put(&key, &CipherBlob::new(b"alice".to_vec()))
545 .await?;
546 assert_eq!(lookup_count(&storage, b"alice"), 1);
547
548 storage.delete(&key).await?;
550
551 assert_eq!(
552 lookup_count(&storage, b"alice"),
553 0,
554 "index entry should be removed on delete"
555 );
556
557 std::fs::remove_dir_all(env::temp_dir().join("lsm_auto_idx_delete")).ok();
558 Ok(())
559 }
560
561 #[tokio::test]
562 async fn test_no_index_manager_noop() -> Result<()> {
563 let dir = env::temp_dir().join("lsm_no_index_mgr");
565 if dir.exists() {
566 std::fs::remove_dir_all(&dir).ok();
567 }
568 std::fs::create_dir_all(&dir).ok();
569
570 let storage = LsmTreeStorage::new(&dir)?;
571
572 let key = Key::from_str("k");
573 let value = CipherBlob::new(b"v".to_vec());
574
575 storage.put(&key, &value).await?;
576 assert_eq!(storage.get(&key).await?, Some(value));
577
578 storage.delete(&key).await?;
579 assert_eq!(storage.get(&key).await?, None);
580
581 std::fs::remove_dir_all(&dir).ok();
582 Ok(())
583 }
584
585 #[tokio::test]
586 async fn test_lsm_storage_basic() -> Result<()> {
587 let dir = env::temp_dir().join("test_lsm_storage_basic");
588 if dir.exists() {
589 std::fs::remove_dir_all(&dir).ok();
590 }
591 std::fs::create_dir_all(&dir).ok();
592
593 let storage = LsmTreeStorage::new(&dir)?;
594
595 let key = Key::from_str("test_key");
597 let value = CipherBlob::new(vec![1, 2, 3, 4, 5]);
598 storage.put(&key, &value).await?;
599
600 let retrieved = storage.get(&key).await?;
602 assert_eq!(retrieved, Some(value.clone()));
603
604 storage.delete(&key).await?;
606 let retrieved = storage.get(&key).await?;
607 assert_eq!(retrieved, None);
608
609 std::fs::remove_dir_all(&dir).ok();
611 Ok(())
612 }
613
614 #[tokio::test]
615 async fn test_lsm_storage_range() -> Result<()> {
616 let dir = env::temp_dir().join("test_lsm_storage_range");
617 if dir.exists() {
618 std::fs::remove_dir_all(&dir).ok();
619 }
620 std::fs::create_dir_all(&dir).ok();
621
622 let storage = LsmTreeStorage::new(&dir)?;
623
624 for i in 0..10 {
626 let key = Key::from_str(&format!("key_{:03}", i));
627 let value = CipherBlob::new(vec![i as u8]);
628 storage.put(&key, &value).await?;
629 }
630
631 let start = Key::from_str("key_003");
633 let end = Key::from_str("key_007");
634 let results = storage.range(&start, &end).await?;
635
636 assert!(!results.is_empty());
637
638 std::fs::remove_dir_all(&dir).ok();
640 Ok(())
641 }
642
643 #[tokio::test]
644 async fn test_lsm_storage_atomic_update() -> Result<()> {
645 let dir = env::temp_dir().join("test_lsm_storage_atomic");
646 if dir.exists() {
647 std::fs::remove_dir_all(&dir).ok();
648 }
649 std::fs::create_dir_all(&dir).ok();
650
651 let storage = LsmTreeStorage::new(&dir)?;
652 let key = Key::from_str("counter");
653 let initial = CipherBlob::new(vec![0]);
654
655 storage.put(&key, &initial).await?;
656
657 storage
659 .atomic_update(&key, |old| {
660 let mut data = old.to_vec();
661 if !data.is_empty() {
662 data[0] += 1;
663 }
664 Ok(CipherBlob::new(data))
665 })
666 .await?;
667
668 let result = storage.get(&key).await?;
669 assert_eq!(
670 result
671 .ok_or_else(|| AmateRSError::KeyNotFound(ErrorContext::new(
672 "Key not found".to_string()
673 )))?
674 .as_bytes()[0],
675 1
676 );
677
678 std::fs::remove_dir_all(&dir).ok();
680 Ok(())
681 }
682
683 #[tokio::test]
684 async fn test_lsm_storage_keys() -> Result<()> {
685 let dir = env::temp_dir().join("test_lsm_storage_keys");
686 if dir.exists() {
687 std::fs::remove_dir_all(&dir).ok();
688 }
689 std::fs::create_dir_all(&dir).ok();
690
691 let storage = LsmTreeStorage::new(&dir)?;
692
693 for i in 0..5 {
695 let key = Key::from_str(&format!("key_{}", i));
696 let value = CipherBlob::new(vec![i as u8]);
697 storage.put(&key, &value).await?;
698 }
699
700 let keys = storage.keys().await?;
702 assert_eq!(keys.len(), 5);
703
704 std::fs::remove_dir_all(&dir).ok();
706 Ok(())
707 }
708
709 #[tokio::test]
710 async fn test_lsm_storage_flush_and_close() -> Result<()> {
711 let dir = env::temp_dir().join("test_lsm_storage_flush");
712 if dir.exists() {
713 std::fs::remove_dir_all(&dir).ok();
714 }
715 std::fs::create_dir_all(&dir).ok();
716
717 let storage = LsmTreeStorage::new(&dir)?;
718
719 let key = Key::from_str("test_key");
721 let value = CipherBlob::new(vec![1, 2, 3]);
722 storage.put(&key, &value).await?;
723
724 storage.flush().await?;
726
727 storage.close().await?;
729
730 std::fs::remove_dir_all(&dir).ok();
732 Ok(())
733 }
734}