1use crate::{Error, Result, Record, Key, Item, SeqNo, Value, wal::Wal, sst::{SstWriter, SstReader}};
2use crate::iterator::{QueryParams, QueryResult, ScanParams, ScanResult};
3use crate::expression::{UpdateAction, UpdateExecutor, ExpressionContext, Expr, ExpressionEvaluator};
4use crate::index::{TableSchema, encode_index_key, decode_index_key};
5use crate::compaction::{CompactionManager, CompactionConfig, CompactionStatsAtomic};
6use crate::config::DatabaseConfig;
7use bytes::Bytes;
8use parking_lot::RwLock;
9use std::collections::BTreeMap;
10use std::path::{Path, PathBuf};
11use std::sync::Arc;
12use std::fs;
13
14const MEMTABLE_THRESHOLD: usize = 1000; const NUM_STRIPES: usize = 256;
16
17pub struct LsmEngine {
19 inner: Arc<RwLock<LsmInner>>,
20}
21
22struct Stripe {
24 memtable: BTreeMap<Vec<u8>, Record>, memtable_size_bytes: usize, ssts: Vec<SstReader>, }
28
29impl Stripe {
30 fn new() -> Self {
31 Self {
32 memtable: BTreeMap::new(),
33 memtable_size_bytes: 0,
34 ssts: Vec::new(),
35 }
36 }
37
38 fn estimate_record_size(key_enc: &[u8], record: &Record) -> usize {
40 let mut size = key_enc.len(); size += std::mem::size_of::<SeqNo>(); if let Some(item) = &record.value {
45 for (attr_name, value) in item {
46 size += attr_name.len();
47 size += match value {
48 Value::S(s) => s.len(),
49 Value::N(n) => n.len(),
50 Value::B(b) => b.len(),
51 Value::Bool(_) => 1,
52 Value::Null => 0,
53 Value::Ts(_) => 8,
54 Value::L(list) => {
55 list.len() * 32 }
58 Value::M(map) => {
59 map.len() * 64 }
62 Value::VecF32(vec) => {
63 vec.len() * 4
65 }
66 };
67 }
68 }
69
70 size
71 }
72}
73
74struct LsmInner {
75 dir: PathBuf,
76 wal: Wal,
77 stripes: Vec<Stripe>, next_seq: SeqNo, next_sst_id: u64, schema: TableSchema, stream_buffer: std::collections::VecDeque<crate::stream::StreamRecord>, compaction_config: CompactionConfig, compaction_stats: CompactionStatsAtomic, config: DatabaseConfig, }
86
87#[derive(Debug, Clone)]
89pub enum TransactWriteOperation {
90 Put {
92 item: Item,
93 condition: Option<Expr>,
94 },
95 Update {
97 actions: Vec<UpdateAction>,
98 condition: Option<Expr>,
99 },
100 Delete {
102 condition: Option<Expr>,
103 },
104 ConditionCheck {
106 condition: Expr,
107 },
108}
109
110impl TransactWriteOperation {
111 pub fn condition(&self) -> Option<&Expr> {
113 match self {
114 Self::Put { condition, .. } => condition.as_ref(),
115 Self::Update { condition, .. } => condition.as_ref(),
116 Self::Delete { condition } => condition.as_ref(),
117 Self::ConditionCheck { condition } => Some(condition),
118 }
119 }
120}
121
122impl LsmInner {
123 fn should_flush_stripe(&self, stripe_id: usize) -> bool {
125 let stripe = &self.stripes[stripe_id];
126
127 if stripe.memtable.len() >= self.config.max_memtable_records {
129 return true;
130 }
131
132 if let Some(max_bytes) = self.config.max_memtable_size_bytes {
134 if stripe.memtable_size_bytes >= max_bytes {
135 return true;
136 }
137 }
138
139 false
140 }
141
142 fn insert_into_memtable(&mut self, stripe_id: usize, key_enc: Vec<u8>, record: Record) {
144 let record_size = Stripe::estimate_record_size(&key_enc, &record);
145
146 if let Some(old_record) = self.stripes[stripe_id].memtable.get(&key_enc) {
148 let old_size = Stripe::estimate_record_size(&key_enc, old_record);
149 self.stripes[stripe_id].memtable_size_bytes =
150 self.stripes[stripe_id].memtable_size_bytes.saturating_sub(old_size);
151 }
152
153 self.stripes[stripe_id].memtable.insert(key_enc, record);
154 self.stripes[stripe_id].memtable_size_bytes += record_size;
155 }
156}
157
158impl LsmEngine {
159 pub fn create(dir: impl AsRef<Path>) -> Result<Self> {
161 Self::create_with_schema(dir, TableSchema::new())
162 }
163
164 pub fn create_with_schema(dir: impl AsRef<Path>, schema: TableSchema) -> Result<Self> {
166 Self::create_with_config(dir, DatabaseConfig::default(), schema)
167 }
168
169 pub fn create_with_config(
171 dir: impl AsRef<Path>,
172 config: DatabaseConfig,
173 schema: TableSchema,
174 ) -> Result<Self> {
175 config.validate().map_err(|e| Error::InvalidArgument(e))?;
177
178 let dir = dir.as_ref();
179 fs::create_dir_all(dir)?;
180
181 let wal_path = dir.join("wal.log");
182 if wal_path.exists() {
183 return Err(Error::AlreadyExists(dir.display().to_string()));
184 }
185
186 let wal = Wal::create(&wal_path)?;
187
188 let stripes = (0..NUM_STRIPES).map(|_| Stripe::new()).collect();
190
191 Ok(Self {
192 inner: Arc::new(RwLock::new(LsmInner {
193 dir: dir.to_path_buf(),
194 wal,
195 stripes,
196 next_seq: 1,
197 next_sst_id: 1,
198 schema,
199 stream_buffer: std::collections::VecDeque::new(),
200 compaction_config: CompactionConfig::default(),
201 compaction_stats: CompactionStatsAtomic::new(),
202 config,
203 })),
204 })
205 }
206
207 pub fn open(dir: impl AsRef<Path>) -> Result<Self> {
209 let dir = dir.as_ref();
210 let wal_path = dir.join("wal.log");
211
212 let wal = Wal::open(&wal_path)?;
213
214 let mut stripes: Vec<Stripe> = (0..NUM_STRIPES).map(|_| Stripe::new()).collect();
216 let mut max_sst_id = 0u64;
217
218 for entry in fs::read_dir(dir)? {
220 let entry = entry?;
221 let path = entry.path();
222 if let Some(ext) = path.extension() {
223 if ext == "sst" {
224 if let Some(stem) = path.file_stem() {
225 if let Some(name) = stem.to_str() {
226 if let Some((stripe_str, id_str)) = name.split_once('-') {
228 if let (Ok(stripe), Ok(id)) = (stripe_str.parse::<usize>(), id_str.parse::<u64>()) {
230 if stripe < NUM_STRIPES {
231 max_sst_id = max_sst_id.max(id);
232 let reader = SstReader::open(&path)?;
233 stripes[stripe].ssts.push(reader);
234 }
235 }
236 } else {
237 if let Ok(id) = name.parse::<u64>() {
239 max_sst_id = max_sst_id.max(id);
240 let reader = SstReader::open(&path)?;
241 stripes[0].ssts.push(reader);
242 }
243 }
244 }
245 }
246 }
247 }
248 }
249
250 for stripe in &mut stripes {
252 stripe.ssts.reverse();
253 }
254
255 let records = wal.read_all()?;
257 let mut max_seq = 0;
258
259 for (_lsn, record) in records {
260 max_seq = max_seq.max(record.seq);
261 let key_enc = record.key.encode().to_vec();
262 let stripe_id = record.key.stripe() as usize;
263 stripes[stripe_id].memtable.insert(key_enc, record);
264 }
265
266 Ok(Self {
267 inner: Arc::new(RwLock::new(LsmInner {
268 dir: dir.to_path_buf(),
269 wal,
270 stripes,
271 next_seq: max_seq + 1,
272 next_sst_id: max_sst_id + 1,
273 schema: TableSchema::new(), stream_buffer: std::collections::VecDeque::new(),
275 compaction_config: CompactionConfig::default(),
276 compaction_stats: CompactionStatsAtomic::new(),
277 config: DatabaseConfig::default(), })),
279 })
280 }
281
282 pub fn put(&self, key: Key, item: Item) -> Result<()> {
284 let mut inner = self.inner.write();
285
286 let old_image = if inner.schema.stream_config.enabled {
288 let stripe_id = key.stripe() as usize;
289 let key_enc = key.encode().to_vec();
290 inner.stripes[stripe_id].memtable.get(&key_enc).and_then(|r| r.value.clone())
291 } else {
292 None
293 };
294
295 let seq = inner.next_seq;
296 inner.next_seq += 1;
297
298 let record = Record::put(key.clone(), item.clone(), seq);
299
300 inner.wal.append(record.clone())?;
302 inner.wal.flush()?;
303
304 let stripe_id = record.key.stripe() as usize;
306 let key_enc = record.key.encode().to_vec();
307 inner.insert_into_memtable(stripe_id, key_enc, record);
308
309 if !inner.schema.local_indexes.is_empty() {
311 self.materialize_lsi_entries(&mut inner, &key, &item)?;
312 }
313
314 if !inner.schema.global_indexes.is_empty() {
316 self.materialize_gsi_entries(&mut inner, &key, &item)?;
317 }
318
319 if inner.schema.stream_config.enabled {
321 let stream_record = if let Some(old) = old_image {
322 crate::stream::StreamRecord::modify(
323 seq,
324 key.clone(),
325 old,
326 item.clone(),
327 inner.schema.stream_config.view_type,
328 )
329 } else {
330 crate::stream::StreamRecord::insert(
331 seq,
332 key.clone(),
333 item.clone(),
334 inner.schema.stream_config.view_type,
335 )
336 };
337 self.emit_stream_record(&mut inner, stream_record);
338 }
339
340 if inner.should_flush_stripe(stripe_id) {
342 self.flush_stripe(&mut inner, stripe_id)?;
343 }
344
345 Ok(())
346 }
347
348 pub fn put_conditional(&self, key: Key, item: Item, condition: &Expr, context: &ExpressionContext) -> Result<()> {
350 let current_item = self.get(&key)?.unwrap_or_else(|| std::collections::HashMap::new());
352
353 let evaluator = ExpressionEvaluator::new(¤t_item, context);
355 let condition_passed = evaluator.evaluate(condition)?;
356
357 if !condition_passed {
358 return Err(Error::ConditionalCheckFailed("Put condition failed".into()));
359 }
360
361 self.put(key, item)
363 }
364
365 pub fn get(&self, key: &Key) -> Result<Option<Item>> {
367 let inner = self.inner.read();
368
369 let stripe_id = key.stripe() as usize;
371 let stripe = &inner.stripes[stripe_id];
372 let key_enc = key.encode().to_vec();
373
374 if let Some(record) = stripe.memtable.get(&key_enc) {
376 if let Some(item) = &record.value {
377 if inner.schema.is_expired(item) {
379 drop(inner); self.delete(key.clone())?;
382 return Ok(None);
383 }
384 }
385 return Ok(record.value.clone());
386 }
387
388 for sst in &stripe.ssts {
390 if let Some(record) = sst.get(key) {
391 if let Some(item) = &record.value {
392 if inner.schema.is_expired(item) {
394 drop(inner); self.delete(key.clone())?;
397 return Ok(None);
398 }
399 }
400 return Ok(record.value.clone());
401 }
402 }
403
404 Ok(None)
405 }
406
407 pub fn delete(&self, key: Key) -> Result<()> {
409 let mut inner = self.inner.write();
410
411 let old_image = if inner.schema.stream_config.enabled {
413 let stripe_id = key.stripe() as usize;
414 let key_enc = key.encode().to_vec();
415 inner.stripes[stripe_id].memtable.get(&key_enc).and_then(|r| r.value.clone())
416 } else {
417 None
418 };
419
420 let seq = inner.next_seq;
421 inner.next_seq += 1;
422
423 let record = Record::delete(key.clone(), seq);
424
425 inner.wal.append(record.clone())?;
427 inner.wal.flush()?;
428
429 let stripe_id = record.key.stripe() as usize;
431 let key_enc = record.key.encode().to_vec();
432 inner.stripes[stripe_id].memtable.insert(key_enc, record);
433
434 if inner.schema.stream_config.enabled {
436 if let Some(old) = old_image {
437 let stream_record = crate::stream::StreamRecord::remove(
438 seq,
439 key.clone(),
440 old,
441 inner.schema.stream_config.view_type,
442 );
443 self.emit_stream_record(&mut inner, stream_record);
444 }
445 }
446
447 if inner.should_flush_stripe(stripe_id) {
449 self.flush_stripe(&mut inner, stripe_id)?;
450 }
451
452 Ok(())
453 }
454
455 pub fn delete_conditional(&self, key: Key, condition: &Expr, context: &ExpressionContext) -> Result<()> {
457 let current_item = self.get(&key)?.unwrap_or_else(|| std::collections::HashMap::new());
459
460 let evaluator = ExpressionEvaluator::new(¤t_item, context);
462 let condition_passed = evaluator.evaluate(condition)?;
463
464 if !condition_passed {
465 return Err(Error::ConditionalCheckFailed("Delete condition failed".into()));
466 }
467
468 self.delete(key)
470 }
471
472 pub fn update(&self, key: &Key, actions: &[UpdateAction], context: &ExpressionContext) -> Result<Item> {
474 let current_item = self.get(key)?.unwrap_or_else(|| std::collections::HashMap::new());
476
477 let executor = UpdateExecutor::new(context);
479 let updated_item = executor.execute(¤t_item, actions)?;
480
481 self.put(key.clone(), updated_item.clone())?;
483
484 Ok(updated_item)
485 }
486
487 pub fn update_conditional(
489 &self,
490 key: &Key,
491 actions: &[UpdateAction],
492 condition: &Expr,
493 context: &ExpressionContext,
494 ) -> Result<Item> {
495 let current_item = self.get(key)?.unwrap_or_else(|| std::collections::HashMap::new());
497
498 let evaluator = ExpressionEvaluator::new(¤t_item, context);
500 let condition_passed = evaluator.evaluate(condition)?;
501
502 if !condition_passed {
503 return Err(Error::ConditionalCheckFailed("Update condition failed".into()));
504 }
505
506 let executor = UpdateExecutor::new(context);
508 let updated_item = executor.execute(¤t_item, actions)?;
509
510 self.put(key.clone(), updated_item.clone())?;
512
513 Ok(updated_item)
514 }
515
516 pub fn query(&self, params: QueryParams) -> Result<QueryResult> {
518 let inner = self.inner.read();
519
520 let stripe_id = {
522 let temp_key = Key::new(params.pk.clone());
523 temp_key.stripe() as usize
524 };
525 let stripe = &inner.stripes[stripe_id];
526
527 let mut items = Vec::new();
528 let mut seen_keys: std::collections::HashSet<Vec<u8>> = std::collections::HashSet::new();
529 let mut scanned_count = 0;
530 let mut last_key = None;
531
532 let mut all_records: BTreeMap<Vec<u8>, Record> = BTreeMap::new();
535
536 let is_index_query = params.index_name.is_some();
538
539 for (key_enc, record) in &stripe.memtable {
541 if is_index_query {
542 if let Some(index_name) = ¶ms.index_name {
544 if let Some((idx_name, idx_pk, idx_sk)) = decode_index_key(key_enc) {
545 if idx_name != *index_name {
547 continue;
548 }
549
550 if idx_pk != params.pk {
552 continue;
553 }
554
555 if !params.matches_sk(&Some(idx_sk)) {
557 continue;
558 }
559
560 all_records.insert(key_enc.clone(), record.clone());
561 }
562 }
563 } else {
564 if record.key.pk != params.pk {
567 continue;
568 }
569
570 if !params.matches_sk(&record.key.sk) {
572 continue;
573 }
574
575 all_records.insert(key_enc.clone(), record.clone());
576 }
577 }
578
579 for _sst in &stripe.ssts {
581 }
585
586 let mut sorted_records: Vec<(Vec<u8>, Record)> = all_records.into_iter().collect();
588
589 if !params.forward {
590 sorted_records.reverse();
591 }
592
593 for (key_enc, record) in sorted_records {
595 if params.should_skip(&record.key) {
597 continue;
598 }
599
600 scanned_count += 1;
601
602 if seen_keys.contains(&key_enc) {
604 continue;
605 }
606 seen_keys.insert(key_enc);
607
608 if record.value.is_none() {
610 continue;
611 }
612
613 if let Some(ref item) = record.value {
615 if inner.schema.is_expired(item) {
616 continue; }
618 }
619
620 last_key = Some(record.key.clone());
621
622 if let Some(item) = record.value {
623 items.push(item);
624
625 if let Some(limit) = params.limit {
627 if items.len() >= limit {
628 break;
629 }
630 }
631 }
632 }
633
634 Ok(QueryResult::new(items, last_key, scanned_count))
635 }
636
637 pub fn batch_get(&self, keys: &[Key]) -> Result<std::collections::HashMap<Key, Option<Item>>> {
639 let mut results = std::collections::HashMap::new();
640
641 for key in keys {
642 let item = self.get(key)?;
643 results.insert(key.clone(), item);
644 }
645
646 Ok(results)
647 }
648
649 pub fn batch_write(&self, operations: &[(Key, Option<Item>)]) -> Result<usize> {
651 let mut processed = 0;
652
653 for (key, item_opt) in operations {
654 match item_opt {
655 Some(item) => {
656 self.put(key.clone(), item.clone())?;
657 processed += 1;
658 }
659 None => {
660 self.delete(key.clone())?;
661 processed += 1;
662 }
663 }
664 }
665
666 Ok(processed)
667 }
668
669 pub fn transact_get(&self, keys: &[Key]) -> Result<Vec<Option<Item>>> {
671 let _inner = self.inner.read();
673
674 let mut items = Vec::new();
675 for key in keys {
676 let item = self.get(key)?;
677 items.push(item);
678 }
679
680 Ok(items)
681 }
682
683 pub fn transact_write(
685 &self,
686 operations: &[(Key, TransactWriteOperation)],
687 context: &ExpressionContext,
688 ) -> Result<usize> {
689 let mut inner = self.inner.write();
691
692 let mut current_items: Vec<Option<Item>> = Vec::new();
694 for (key, op) in operations {
695 let item = {
696 let stripe_id = key.stripe() as usize;
697 let stripe = &inner.stripes[stripe_id];
698 let key_enc = key.encode().to_vec();
699
700 if let Some(record) = stripe.memtable.get(&key_enc) {
702 record.value.clone()
703 } else {
704 let mut found = None;
706 for sst in &stripe.ssts {
707 if let Some(record) = sst.get(key) {
708 found = record.value.clone();
709 break;
710 }
711 }
712 found
713 }
714 };
715
716 current_items.push(item.clone());
717
718 if let Some(condition_expr) = op.condition() {
720 let current_item = item.unwrap_or_else(|| std::collections::HashMap::new());
721 let evaluator = ExpressionEvaluator::new(¤t_item, context);
722 let condition_passed = evaluator.evaluate(condition_expr)?;
723
724 if !condition_passed {
725 return Err(Error::TransactionCanceled(format!(
726 "Condition failed for key {:?}",
727 key
728 )));
729 }
730 }
731 }
732
733 let mut committed = 0;
735 for (i, (key, op)) in operations.iter().enumerate() {
736 match op {
737 TransactWriteOperation::Put { item, .. } => {
738 let seq = inner.next_seq;
740 inner.next_seq += 1;
741 let record = Record::put(key.clone(), item.clone(), seq);
742 inner.wal.append(record.clone())?;
743 inner.wal.flush()?;
744
745 let stripe_id = record.key.stripe() as usize;
746 let key_enc = record.key.encode().to_vec();
747 inner.stripes[stripe_id].memtable.insert(key_enc, record);
748
749 if inner.stripes[stripe_id].memtable.len() >= MEMTABLE_THRESHOLD {
750 self.flush_stripe(&mut inner, stripe_id)?;
751 }
752
753 committed += 1;
754 }
755 TransactWriteOperation::Delete { .. } => {
756 let seq = inner.next_seq;
758 inner.next_seq += 1;
759 let record = Record::delete(key.clone(), seq);
760 inner.wal.append(record.clone())?;
761 inner.wal.flush()?;
762
763 let stripe_id = record.key.stripe() as usize;
764 let key_enc = record.key.encode().to_vec();
765 inner.stripes[stripe_id].memtable.insert(key_enc, record);
766
767 if inner.stripes[stripe_id].memtable.len() >= MEMTABLE_THRESHOLD {
768 self.flush_stripe(&mut inner, stripe_id)?;
769 }
770
771 committed += 1;
772 }
773 TransactWriteOperation::Update { actions, .. } => {
774 let current_item = current_items[i].clone().unwrap_or_else(|| std::collections::HashMap::new());
776 let executor = UpdateExecutor::new(context);
777 let updated_item = executor.execute(¤t_item, actions)?;
778
779 let seq = inner.next_seq;
780 inner.next_seq += 1;
781 let record = Record::put(key.clone(), updated_item, seq);
782 inner.wal.append(record.clone())?;
783 inner.wal.flush()?;
784
785 let stripe_id = record.key.stripe() as usize;
786 let key_enc = record.key.encode().to_vec();
787 inner.stripes[stripe_id].memtable.insert(key_enc, record);
788
789 if inner.stripes[stripe_id].memtable.len() >= MEMTABLE_THRESHOLD {
790 self.flush_stripe(&mut inner, stripe_id)?;
791 }
792
793 committed += 1;
794 }
795 TransactWriteOperation::ConditionCheck { .. } => {
796 committed += 1;
798 }
799 }
800 }
801
802 Ok(committed)
803 }
804
805 pub fn scan(&self, params: ScanParams) -> Result<ScanResult> {
807 let inner = self.inner.read();
808
809 let mut all_records: BTreeMap<Vec<u8>, Record> = BTreeMap::new();
811
812 for stripe_id in 0..NUM_STRIPES {
814 if !params.should_scan_stripe(stripe_id) {
816 continue;
817 }
818
819 let stripe = &inner.stripes[stripe_id];
820
821 for (key_enc, record) in &stripe.memtable {
823 if record.value.is_none() {
825 continue;
826 }
827
828 all_records.insert(key_enc.clone(), record.clone());
829 }
830
831 }
834
835 let mut items = Vec::new();
837 let mut scanned_count = 0;
838 let mut last_key = None;
839
840 for (_, record) in all_records {
841 if params.should_skip(&record.key) {
843 continue;
844 }
845
846 scanned_count += 1;
847
848 if let Some(ref item) = record.value {
850 if inner.schema.is_expired(item) {
851 continue; }
853 }
854
855 last_key = Some(record.key.clone());
856
857 if let Some(item) = record.value {
858 items.push(item);
859
860 if let Some(limit) = params.limit {
862 if items.len() >= limit {
863 return Ok(ScanResult::new(items, last_key, scanned_count));
864 }
865 }
866 }
867 }
868
869 Ok(ScanResult::new(items, last_key, scanned_count))
870 }
871
872 fn materialize_lsi_entries(&self, inner: &mut LsmInner, key: &Key, item: &Item) -> Result<()> {
874 for lsi in &inner.schema.local_indexes {
876 if let Some(index_sk_value) = item.get(&lsi.sort_key_attribute) {
878 let index_sk_bytes = match index_sk_value {
880 Value::S(s) => Bytes::copy_from_slice(s.as_bytes()),
881 Value::N(n) => Bytes::copy_from_slice(n.as_bytes()),
882 Value::B(b) => b.clone(),
883 Value::Bool(b) => Bytes::copy_from_slice(if *b { b"true" } else { b"false" }),
884 Value::Ts(ts) => Bytes::copy_from_slice(&ts.to_le_bytes()),
885 _ => continue, };
887
888 let index_key_encoded = encode_index_key(&lsi.name, &key.pk, &index_sk_bytes);
890
891 let index_item = item.clone(); let index_key = Key::new(Bytes::copy_from_slice(&index_key_encoded));
897
898 let seq = inner.next_seq;
899 inner.next_seq += 1;
900
901 let index_record = Record::put(index_key, index_item, seq);
902
903 inner.wal.append(index_record.clone())?;
905
906 let stripe_id = key.stripe() as usize;
908 inner.stripes[stripe_id].memtable.insert(index_key_encoded, index_record);
909 }
910 }
911
912 Ok(())
913 }
914
915 fn materialize_gsi_entries(&self, inner: &mut LsmInner, base_key: &Key, item: &Item) -> Result<()> {
917 for gsi in &inner.schema.global_indexes {
919 if let Some(gsi_pk_value) = item.get(&gsi.partition_key_attribute) {
921 let gsi_pk_bytes = match gsi_pk_value {
923 Value::S(s) => Bytes::copy_from_slice(s.as_bytes()),
924 Value::N(n) => Bytes::copy_from_slice(n.as_bytes()),
925 Value::B(b) => b.clone(),
926 Value::Bool(b) => Bytes::copy_from_slice(if *b { b"true" } else { b"false" }),
927 Value::Ts(ts) => Bytes::copy_from_slice(&ts.to_le_bytes()),
928 _ => continue, };
930
931 let mut gsi_sk_bytes = if let Some(gsi_sk_attr) = &gsi.sort_key_attribute {
933 if let Some(gsi_sk_value) = item.get(gsi_sk_attr) {
934 match gsi_sk_value {
935 Value::S(s) => Bytes::copy_from_slice(s.as_bytes()),
936 Value::N(n) => Bytes::copy_from_slice(n.as_bytes()),
937 Value::B(b) => b.clone(),
938 Value::Bool(b) => Bytes::copy_from_slice(if *b { b"true" } else { b"false" }),
939 Value::Ts(ts) => Bytes::copy_from_slice(&ts.to_le_bytes()),
940 _ => Bytes::new(), }
942 } else {
943 continue; }
945 } else {
946 Bytes::new() };
948
949 let base_pk_encoded = base_key.encode();
952 let mut combined_sk = Vec::with_capacity(gsi_sk_bytes.len() + base_pk_encoded.len());
953 combined_sk.extend_from_slice(&gsi_sk_bytes);
954 combined_sk.extend_from_slice(&base_pk_encoded);
955 gsi_sk_bytes = Bytes::from(combined_sk);
956
957 let index_key_encoded = encode_index_key(&gsi.name, &gsi_pk_bytes, &gsi_sk_bytes);
959
960 let index_item = item.clone(); let index_key = Key::new(Bytes::copy_from_slice(&index_key_encoded));
965
966 let seq = inner.next_seq;
967 inner.next_seq += 1;
968
969 let index_record = Record::put(index_key, index_item, seq);
970
971 inner.wal.append(index_record.clone())?;
973
974 let gsi_stripe_key = Key::new(gsi_pk_bytes.clone());
977 let gsi_stripe_id = gsi_stripe_key.stripe() as usize;
978 inner.stripes[gsi_stripe_id].memtable.insert(index_key_encoded, index_record);
979 }
980 }
981
982 Ok(())
983 }
984
985 pub fn read_stream(&self, after_sequence_number: Option<u64>) -> Result<Vec<crate::stream::StreamRecord>> {
990 let inner = self.inner.read();
991
992 if !inner.schema.stream_config.enabled {
993 return Ok(Vec::new());
994 }
995
996 let records: Vec<crate::stream::StreamRecord> = inner.stream_buffer
997 .iter()
998 .filter(|record| {
999 if let Some(after) = after_sequence_number {
1000 record.sequence_number > after
1001 } else {
1002 true
1003 }
1004 })
1005 .cloned()
1006 .collect();
1007
1008 Ok(records)
1009 }
1010
1011 fn emit_stream_record(&self, inner: &mut LsmInner, record: crate::stream::StreamRecord) {
1013 if !inner.schema.stream_config.enabled {
1014 return;
1015 }
1016
1017 inner.stream_buffer.push_back(record);
1019
1020 while inner.stream_buffer.len() > inner.schema.stream_config.buffer_size {
1022 inner.stream_buffer.pop_front();
1023 }
1024 }
1025
1026 fn flush_stripe(&self, inner: &mut LsmInner, stripe_id: usize) -> Result<()> {
1028 if inner.stripes[stripe_id].memtable.is_empty() {
1029 return Ok(());
1030 }
1031
1032 let sst_id = inner.next_sst_id;
1033 inner.next_sst_id += 1;
1034
1035 let sst_path = inner.dir.join(format!("{:03}-{}.sst", stripe_id, sst_id));
1037
1038 let mut writer = SstWriter::new();
1040 for record in inner.stripes[stripe_id].memtable.values() {
1041 writer.add(record.clone());
1042 }
1043 writer.finish(&sst_path)?;
1044
1045 let reader = SstReader::open(&sst_path)?;
1047
1048 inner.stripes[stripe_id].ssts.insert(0, reader);
1050
1051 inner.stripes[stripe_id].memtable.clear();
1053 inner.stripes[stripe_id].memtable_size_bytes = 0;
1054
1055 if inner.compaction_config.enabled && inner.stripes[stripe_id].ssts.len() >= inner.compaction_config.sst_threshold {
1057 let _guard = inner.compaction_stats.start_compaction();
1059
1060 let compaction_mgr = CompactionManager::new(stripe_id, inner.dir.clone());
1061 let ssts_to_compact = &inner.stripes[stripe_id].ssts;
1062 let sst_count = ssts_to_compact.len();
1063
1064 let compacted_sst_id = inner.next_sst_id;
1066 inner.next_sst_id += 1;
1067
1068 let (new_sst, old_paths) = compaction_mgr.compact(ssts_to_compact, compacted_sst_id)?;
1070
1071 inner.compaction_stats.record_ssts_merged(sst_count as u64);
1073 inner.compaction_stats.record_ssts_created(1);
1074
1075 inner.stripes[stripe_id].ssts.clear();
1077 inner.stripes[stripe_id].ssts.push(new_sst);
1078
1079 compaction_mgr.cleanup_old_ssts(old_paths)?;
1081 }
1082
1083 Ok(())
1084 }
1085
1086 pub fn flush(&self) -> Result<()> {
1088 let mut inner = self.inner.write();
1089
1090 for stripe_id in 0..NUM_STRIPES {
1092 if !inner.stripes[stripe_id].memtable.is_empty() {
1093 self.flush_stripe(&mut inner, stripe_id)?;
1094 }
1095 }
1096
1097 Ok(())
1098 }
1099
1100 pub fn set_compaction_config(&self, config: CompactionConfig) {
1121 let mut inner = self.inner.write();
1122 inner.compaction_config = config;
1123 }
1124
1125 pub fn compaction_config(&self) -> CompactionConfig {
1127 let inner = self.inner.read();
1128 inner.compaction_config.clone()
1129 }
1130
1131 pub fn compaction_stats(&self) -> crate::compaction::CompactionStats {
1155 let inner = self.inner.read();
1156 inner.compaction_stats.snapshot()
1157 }
1158
1159 pub fn trigger_compaction(&self, stripe_id: usize) -> Result<()> {
1164 if stripe_id >= NUM_STRIPES {
1165 return Err(Error::InvalidArgument(format!(
1166 "Invalid stripe_id: {}, must be < {}",
1167 stripe_id, NUM_STRIPES
1168 )));
1169 }
1170
1171 let mut inner = self.inner.write();
1172
1173 if inner.stripes[stripe_id].ssts.len() >= inner.compaction_config.sst_threshold {
1175 let _guard = inner.compaction_stats.start_compaction();
1176 let compaction_mgr = CompactionManager::new(stripe_id, inner.dir.clone());
1177
1178 let sst_count = inner.stripes[stripe_id].ssts.len();
1179 let compacted_sst_id = inner.next_sst_id;
1180 inner.next_sst_id += 1;
1181
1182 let ssts_to_compact = &inner.stripes[stripe_id].ssts;
1183 let (new_sst, old_paths) = compaction_mgr.compact(ssts_to_compact, compacted_sst_id)?;
1184
1185 inner.compaction_stats.record_ssts_merged(sst_count as u64);
1186 inner.compaction_stats.record_ssts_created(1);
1187
1188 inner.stripes[stripe_id].ssts.clear();
1189 inner.stripes[stripe_id].ssts.push(new_sst);
1190
1191 compaction_mgr.cleanup_old_ssts(old_paths)?;
1192 }
1193
1194 Ok(())
1195 }
1196}
1197
1198#[cfg(test)]
1199mod tests {
1200 use super::*;
1201 use crate::Value;
1202 use tempfile::TempDir;
1203 use std::collections::HashMap;
1204
1205 #[test]
1206 fn test_lsm_create() {
1207 let dir = TempDir::new().unwrap();
1208 let _db = LsmEngine::create(dir.path()).unwrap();
1209 }
1210
1211 #[test]
1212 fn test_lsm_put_get() {
1213 let dir = TempDir::new().unwrap();
1214 let db = LsmEngine::create(dir.path()).unwrap();
1215
1216 let key = Key::new(b"user#123".to_vec());
1217 let mut item = HashMap::new();
1218 item.insert("name".to_string(), Value::string("Alice"));
1219 item.insert("age".to_string(), Value::number(30));
1220
1221 db.put(key.clone(), item.clone()).unwrap();
1222
1223 let result = db.get(&key).unwrap();
1224 assert_eq!(result, Some(item));
1225 }
1226
1227 #[test]
1228 fn test_lsm_delete() {
1229 let dir = TempDir::new().unwrap();
1230 let db = LsmEngine::create(dir.path()).unwrap();
1231
1232 let key = Key::new(b"user#123".to_vec());
1233 let mut item = HashMap::new();
1234 item.insert("name".to_string(), Value::string("Bob"));
1235
1236 db.put(key.clone(), item).unwrap();
1237 assert!(db.get(&key).unwrap().is_some());
1238
1239 db.delete(key.clone()).unwrap();
1240 assert!(db.get(&key).unwrap().is_none());
1241 }
1242
1243 #[test]
1244 fn test_lsm_reopen() {
1245 let dir = TempDir::new().unwrap();
1246 let path = dir.path().to_path_buf();
1247
1248 let key = Key::new(b"persistent".to_vec());
1249 let mut item = HashMap::new();
1250 item.insert("data".to_string(), Value::string("test"));
1251
1252 {
1254 let db = LsmEngine::create(&path).unwrap();
1255 db.put(key.clone(), item.clone()).unwrap();
1256 }
1257
1258 let db = LsmEngine::open(&path).unwrap();
1260 let result = db.get(&key).unwrap();
1261 assert_eq!(result, Some(item));
1262 }
1263
1264 #[test]
1265 fn test_lsm_flush() {
1266 let dir = TempDir::new().unwrap();
1267 let db = LsmEngine::create(dir.path()).unwrap();
1268
1269 for i in 0..MEMTABLE_THRESHOLD + 10 {
1271 let key = Key::new(format!("key{}", i).into_bytes());
1272 let mut item = HashMap::new();
1273 item.insert("value".to_string(), Value::number(i));
1274 db.put(key, item).unwrap();
1275 }
1276
1277 for i in 0..MEMTABLE_THRESHOLD + 10 {
1279 let key = Key::new(format!("key{}", i).into_bytes());
1280 let result = db.get(&key).unwrap();
1281 assert!(result.is_some());
1282 }
1283 }
1284
1285 #[test]
1286 fn test_lsm_overwrite() {
1287 let dir = TempDir::new().unwrap();
1288 let db = LsmEngine::create(dir.path()).unwrap();
1289
1290 let key = Key::new(b"counter".to_vec());
1291
1292 for i in 0..5 {
1293 let mut item = HashMap::new();
1294 item.insert("value".to_string(), Value::number(i));
1295 db.put(key.clone(), item).unwrap();
1296 }
1297
1298 let result = db.get(&key).unwrap().unwrap();
1299 match result.get("value").unwrap() {
1300 Value::N(n) => assert_eq!(n, "4"),
1301 _ => panic!("Expected number value"),
1302 }
1303 }
1304
1305 #[test]
1306 fn test_lsm_striping() {
1307 let dir = TempDir::new().unwrap();
1308 let db = LsmEngine::create(dir.path()).unwrap();
1309
1310 let mut keys_by_stripe: HashMap<u8, Vec<Key>> = HashMap::new();
1312
1313 for i in 0..1000 {
1314 let key = Key::new(format!("key{}", i).into_bytes());
1315 let stripe = key.stripe();
1316
1317 let mut item = HashMap::new();
1318 item.insert("id".to_string(), Value::number(i));
1319 db.put(key.clone(), item).unwrap();
1320
1321 keys_by_stripe.entry(stripe).or_insert_with(Vec::new).push(key);
1322 }
1323
1324 assert!(keys_by_stripe.len() > 1, "Expected keys to be distributed across multiple stripes");
1326
1327 for (stripe, keys) in keys_by_stripe {
1329 for key in keys {
1330 let result = db.get(&key).unwrap();
1331 assert!(result.is_some(), "Key should exist in stripe {}", stripe);
1332 }
1333 }
1334 }
1335
1336 #[test]
1337 fn test_lsm_stripe_independent_flush() {
1338 let dir = TempDir::new().unwrap();
1339 let db = LsmEngine::create(dir.path()).unwrap();
1340
1341 let pk = b"user#123";
1343 let base_key = Key::new(pk.to_vec());
1344 let stripe = base_key.stripe();
1345
1346 for i in 0..MEMTABLE_THRESHOLD + 10 {
1348 let key = Key::with_sk(pk.to_vec(), format!("item#{}", i).into_bytes());
1349 assert_eq!(key.stripe(), stripe);
1351
1352 let mut item = HashMap::new();
1353 item.insert("value".to_string(), Value::number(i));
1354 db.put(key, item).unwrap();
1355 }
1356
1357 db.flush().unwrap();
1359
1360 let mut found_striped_sst = false;
1362 for entry in fs::read_dir(dir.path()).unwrap() {
1363 let entry = entry.unwrap();
1364 if let Some(name) = entry.file_name().to_str() {
1365 if name.ends_with(".sst") && name.starts_with(&format!("{:03}-", stripe)) {
1366 found_striped_sst = true;
1367 break;
1368 }
1369 }
1370 }
1371
1372 assert!(found_striped_sst, "Expected SST file with stripe prefix");
1373 }
1374
1375 #[test]
1376 fn test_lsm_query_basic() {
1377 use crate::iterator::QueryParams;
1378 use bytes::Bytes;
1379
1380 let dir = TempDir::new().unwrap();
1381 let db = LsmEngine::create(dir.path()).unwrap();
1382
1383 let pk = b"user#123";
1384
1385 for i in 0..10 {
1387 let key = Key::with_sk(pk.to_vec(), format!("item#{:03}", i).into_bytes());
1388 let mut item = HashMap::new();
1389 item.insert("id".to_string(), Value::number(i));
1390 item.insert("name".to_string(), Value::string(format!("Item {}", i)));
1391 db.put(key, item).unwrap();
1392 }
1393
1394 let params = QueryParams::new(Bytes::from(pk.to_vec()));
1396 let result = db.query(params).unwrap();
1397
1398 assert_eq!(result.items.len(), 10);
1399 assert_eq!(result.scanned_count, 10);
1400 }
1401
1402 #[test]
1403 fn test_lsm_query_with_limit() {
1404 use crate::iterator::QueryParams;
1405 use bytes::Bytes;
1406
1407 let dir = TempDir::new().unwrap();
1408 let db = LsmEngine::create(dir.path()).unwrap();
1409
1410 let pk = b"user#456";
1411
1412 for i in 0..20 {
1414 let key = Key::with_sk(pk.to_vec(), format!("item#{:03}", i).into_bytes());
1415 let mut item = HashMap::new();
1416 item.insert("id".to_string(), Value::number(i));
1417 db.put(key, item).unwrap();
1418 }
1419
1420 let params = QueryParams::new(Bytes::from(pk.to_vec())).with_limit(5);
1422 let result = db.query(params).unwrap();
1423
1424 assert_eq!(result.items.len(), 5);
1425 assert!(result.last_key.is_some());
1426 }
1427
1428 #[test]
1429 fn test_lsm_query_with_sk_condition() {
1430 use crate::iterator::{QueryParams, SortKeyCondition};
1431 use bytes::Bytes;
1432
1433 let dir = TempDir::new().unwrap();
1434 let db = LsmEngine::create(dir.path()).unwrap();
1435
1436 let pk = b"user#789";
1437
1438 for i in 0..10 {
1440 let key = Key::with_sk(pk.to_vec(), format!("item#{:03}", i).into_bytes());
1441 let mut item = HashMap::new();
1442 item.insert("id".to_string(), Value::number(i));
1443 db.put(key, item).unwrap();
1444 }
1445
1446 let params = QueryParams::new(Bytes::from(pk.to_vec()))
1448 .with_sk_condition(SortKeyCondition::BeginsWith, Bytes::from("item#00"), None);
1449 let result = db.query(params).unwrap();
1450
1451 assert_eq!(result.items.len(), 10);
1453 }
1454
1455 #[test]
1456 fn test_lsm_query_reverse() {
1457 use crate::iterator::QueryParams;
1458 use bytes::Bytes;
1459
1460 let dir = TempDir::new().unwrap();
1461 let db = LsmEngine::create(dir.path()).unwrap();
1462
1463 let pk = b"user#999";
1464
1465 for i in 0..5 {
1467 let key = Key::with_sk(pk.to_vec(), format!("item#{}", i).into_bytes());
1468 let mut item = HashMap::new();
1469 item.insert("id".to_string(), Value::number(i));
1470 db.put(key, item).unwrap();
1471 }
1472
1473 let params = QueryParams::new(Bytes::from(pk.to_vec())).with_direction(false);
1475 let result = db.query(params).unwrap();
1476
1477 assert_eq!(result.items.len(), 5);
1478 if let Some(Value::N(n)) = result.items[0].get("id") {
1480 assert_eq!(n, "4");
1481 } else {
1482 panic!("Expected number value");
1483 }
1484 }
1485
1486 #[test]
1487 fn test_lsm_scan_basic() {
1488 use crate::iterator::ScanParams;
1489
1490 let dir = TempDir::new().unwrap();
1491 let db = LsmEngine::create(dir.path()).unwrap();
1492
1493 for i in 0..20 {
1495 let pk = format!("user#{}", i);
1496 let key = Key::new(pk.into_bytes());
1497 let mut item = HashMap::new();
1498 item.insert("id".to_string(), Value::number(i));
1499 db.put(key, item).unwrap();
1500 }
1501
1502 let params = ScanParams::new();
1504 let result = db.scan(params).unwrap();
1505
1506 assert_eq!(result.items.len(), 20);
1507 assert_eq!(result.scanned_count, 20);
1508 }
1509
1510 #[test]
1511 fn test_lsm_scan_with_limit() {
1512 use crate::iterator::ScanParams;
1513
1514 let dir = TempDir::new().unwrap();
1515 let db = LsmEngine::create(dir.path()).unwrap();
1516
1517 for i in 0..50 {
1519 let pk = format!("item#{:03}", i);
1520 let key = Key::new(pk.into_bytes());
1521 let mut item = HashMap::new();
1522 item.insert("value".to_string(), Value::number(i));
1523 db.put(key, item).unwrap();
1524 }
1525
1526 let params = ScanParams::new().with_limit(10);
1528 let result = db.scan(params).unwrap();
1529
1530 assert_eq!(result.items.len(), 10);
1531 assert!(result.last_key.is_some());
1532 }
1533
1534 #[test]
1535 fn test_lsm_scan_parallel() {
1536 use crate::iterator::ScanParams;
1537
1538 let dir = TempDir::new().unwrap();
1539 let db = LsmEngine::create(dir.path()).unwrap();
1540
1541 for i in 0..100 {
1543 let pk = format!("key{}", i);
1544 let key = Key::new(pk.into_bytes());
1545 let mut item = HashMap::new();
1546 item.insert("value".to_string(), Value::number(i));
1547 db.put(key, item).unwrap();
1548 }
1549
1550 let mut total_items = 0;
1552 for segment in 0..4 {
1553 let params = ScanParams::new().with_segment(segment, 4);
1554 let result = db.scan(params).unwrap();
1555 total_items += result.items.len();
1556 }
1557
1558 assert_eq!(total_items, 100);
1560 }
1561
1562 #[test]
1563 fn test_lsm_scan_pagination() {
1564 use crate::iterator::ScanParams;
1565
1566 let dir = TempDir::new().unwrap();
1567 let db = LsmEngine::create(dir.path()).unwrap();
1568
1569 for i in 0..30 {
1571 let pk = format!("user#{:03}", i);
1572 let key = Key::new(pk.into_bytes());
1573 let mut item = HashMap::new();
1574 item.insert("id".to_string(), Value::number(i));
1575 db.put(key, item).unwrap();
1576 }
1577
1578 let params1 = ScanParams::new().with_limit(10);
1580 let result1 = db.scan(params1).unwrap();
1581 assert_eq!(result1.items.len(), 10);
1582 assert!(result1.last_key.is_some());
1583
1584 let params2 = ScanParams::new()
1586 .with_limit(10)
1587 .with_start_key(result1.last_key.unwrap());
1588 let result2 = db.scan(params2).unwrap();
1589 assert_eq!(result2.items.len(), 10);
1590
1591 let params3 = ScanParams::new()
1593 .with_limit(10)
1594 .with_start_key(result2.last_key.unwrap());
1595 let result3 = db.scan(params3).unwrap();
1596 assert_eq!(result3.items.len(), 10);
1597 }
1598
1599 #[test]
1600 fn test_lsm_compaction_triggered() {
1601 use crate::compaction::COMPACTION_THRESHOLD;
1602
1603 let dir = TempDir::new().unwrap();
1604 let db = LsmEngine::create(dir.path()).unwrap();
1605
1606 for batch in 0..COMPACTION_THRESHOLD {
1609 for i in 0..MEMTABLE_THRESHOLD {
1611 let key = Key::new(format!("batch{:02}_key{:04}", batch, i).into_bytes());
1612 let mut item = HashMap::new();
1613 item.insert("batch".to_string(), Value::number(batch as i64));
1614 item.insert("seq".to_string(), Value::number(i as i64));
1615 db.put(key, item).unwrap();
1616 }
1617
1618 }
1620
1621 for i in 0..MEMTABLE_THRESHOLD {
1623 let key = Key::new(format!("final_key{:04}", i).into_bytes());
1624 let mut item = HashMap::new();
1625 item.insert("final".to_string(), Value::number(1));
1626 db.put(key, item).unwrap();
1627 }
1628
1629 let key1 = Key::new(b"batch00_key0000".to_vec());
1632 let result1 = db.get(&key1).unwrap();
1633 assert!(result1.is_some());
1634 let item1 = result1.unwrap();
1635 assert_eq!(item1.get("batch").unwrap(), &Value::N("0".to_string()));
1636
1637 let key2 = Key::new(b"final_key0000".to_vec());
1639 let result2 = db.get(&key2).unwrap();
1640 assert!(result2.is_some());
1641
1642 }
1646
1647 #[test]
1648 fn test_lsm_compaction_removes_tombstones() {
1649 let dir = TempDir::new().unwrap();
1650 let db = LsmEngine::create(dir.path()).unwrap();
1651
1652 for i in 0..100 {
1654 let key = Key::new(format!("key{:03}", i).into_bytes());
1655 let mut item = HashMap::new();
1656 item.insert("value".to_string(), Value::number(i));
1657 db.put(key, item).unwrap();
1658 }
1659
1660 db.flush().unwrap();
1662
1663 for i in 0..50 {
1665 let key = Key::new(format!("key{:03}", i).into_bytes());
1666 db.delete(key).unwrap();
1667 }
1668
1669 db.flush().unwrap();
1671
1672 for i in 0..50 {
1674 let key = Key::new(format!("key{:03}", i).into_bytes());
1675 assert!(db.get(&key).unwrap().is_none());
1676 }
1677
1678 for i in 50..100 {
1680 let key = Key::new(format!("key{:03}", i).into_bytes());
1681 let result = db.get(&key).unwrap();
1682 assert!(result.is_some());
1683 }
1684 }
1685
1686 #[test]
1687 fn test_lsm_compaction_keeps_latest_version() {
1688 let dir = TempDir::new().unwrap();
1689 let db = LsmEngine::create(dir.path()).unwrap();
1690
1691 let key = Key::new(b"test_key".to_vec());
1693 let mut item1 = HashMap::new();
1694 item1.insert("version".to_string(), Value::number(1));
1695 db.put(key.clone(), item1).unwrap();
1696
1697 db.flush().unwrap();
1699
1700 let mut item2 = HashMap::new();
1702 item2.insert("version".to_string(), Value::number(2));
1703 db.put(key.clone(), item2).unwrap();
1704
1705 db.flush().unwrap();
1707
1708 let mut item3 = HashMap::new();
1710 item3.insert("version".to_string(), Value::number(3));
1711 db.put(key.clone(), item3).unwrap();
1712
1713 db.flush().unwrap();
1715
1716 let result = db.get(&key).unwrap().unwrap();
1718 assert_eq!(result.get("version").unwrap(), &Value::N("3".to_string()));
1719
1720 drop(db);
1722 let db = LsmEngine::open(dir.path()).unwrap();
1723
1724 let result = db.get(&key).unwrap().unwrap();
1726 assert_eq!(result.get("version").unwrap(), &Value::N("3".to_string()));
1727 }
1728
1729 #[test]
1730 fn test_compaction_configuration() {
1731 let dir = TempDir::new().unwrap();
1732 let db = LsmEngine::create(dir.path()).unwrap();
1733
1734 let config = db.compaction_config();
1736 assert!(config.enabled);
1737 assert_eq!(config.sst_threshold, 10);
1738
1739 db.set_compaction_config(CompactionConfig::disabled());
1741 let config = db.compaction_config();
1742 assert!(!config.enabled);
1743
1744 let custom_config = CompactionConfig::new()
1746 .with_sst_threshold(5)
1747 .with_check_interval(30);
1748 db.set_compaction_config(custom_config);
1749
1750 let config = db.compaction_config();
1751 assert!(config.enabled);
1752 assert_eq!(config.sst_threshold, 5);
1753 assert_eq!(config.check_interval_secs, 30);
1754 }
1755
1756 #[test]
1757 fn test_compaction_statistics() {
1758 let dir = TempDir::new().unwrap();
1759 let db = LsmEngine::create(dir.path()).unwrap();
1760
1761 let stats = db.compaction_stats();
1763 assert_eq!(stats.total_compactions, 0);
1764 assert_eq!(stats.total_ssts_merged, 0);
1765 assert_eq!(stats.active_compactions, 0);
1766
1767 let pk = b"testdata";
1769
1770 for batch in 0..12 {
1772 for i in 0..MEMTABLE_THRESHOLD {
1773 let key = Key::with_sk(pk.to_vec(), format!("batch{:02}_key{:04}", batch, i).into_bytes());
1774 let mut item = HashMap::new();
1775 item.insert("value".to_string(), Value::number(i as i64));
1776 db.put(key, item).unwrap();
1777 }
1778 }
1779
1780 let stats = db.compaction_stats();
1782 assert!(stats.total_compactions > 0, "Expected at least one compaction");
1783 assert!(stats.total_ssts_merged > 0, "Expected SSTs to be merged");
1784 assert!(stats.total_ssts_created > 0, "Expected new SSTs to be created");
1785 }
1786
1787 #[test]
1788 fn test_manual_compaction_trigger() {
1789 let dir = TempDir::new().unwrap();
1790 let db = LsmEngine::create(dir.path()).unwrap();
1791
1792 db.set_compaction_config(CompactionConfig::new().with_sst_threshold(3));
1794
1795 let mut count = 0;
1798 for i in 0..50000 {
1799 let key = Key::new(format!("key{:06}", i).into_bytes());
1800 if key.stripe() == 0 {
1801 let mut item = HashMap::new();
1802 item.insert("value".to_string(), Value::number(i));
1803 db.put(key, item).unwrap();
1804 count += 1;
1805 if count >= MEMTABLE_THRESHOLD * 4 {
1806 break;
1807 }
1808 }
1809 }
1810
1811 db.flush().unwrap();
1813
1814 let stats_before = db.compaction_stats();
1816
1817 db.trigger_compaction(0).unwrap();
1819
1820 let stats_after = db.compaction_stats();
1822 assert!(
1823 stats_after.total_compactions >= stats_before.total_compactions,
1824 "Compaction count should increase or stay the same"
1825 );
1826 }
1827
1828 #[test]
1829 fn test_compaction_disabled() {
1830 let dir = TempDir::new().unwrap();
1831 let db = LsmEngine::create(dir.path()).unwrap();
1832
1833 db.set_compaction_config(CompactionConfig::disabled());
1835
1836 for batch in 0..15 {
1838 for i in 0..MEMTABLE_THRESHOLD {
1839 let key = Key::new(format!("batch{:02}_key{:04}", batch, i).into_bytes());
1840 let mut item = HashMap::new();
1841 item.insert("value".to_string(), Value::number(i as i64));
1842 db.put(key, item).unwrap();
1843 }
1844 }
1845
1846 db.flush().unwrap();
1848
1849 let stats = db.compaction_stats();
1851 assert_eq!(stats.total_compactions, 0, "No compactions should occur when disabled");
1852 }
1853
1854 #[test]
1855 fn test_compaction_with_deletes_reclaims_space() {
1856 let dir = TempDir::new().unwrap();
1857 let db = LsmEngine::create(dir.path()).unwrap();
1858
1859 for i in 0..200 {
1861 let key = Key::new(format!("key{:03}", i).into_bytes());
1862 let mut item = HashMap::new();
1863 item.insert("value".to_string(), Value::number(i));
1864 db.put(key, item).unwrap();
1865 }
1866
1867 db.flush().unwrap();
1869
1870 for i in 0..100 {
1872 let key = Key::new(format!("key{:03}", i).into_bytes());
1873 db.delete(key).unwrap();
1874 }
1875
1876 db.flush().unwrap();
1878
1879 for batch in 0..12 {
1881 for i in 200..(200 + MEMTABLE_THRESHOLD) {
1882 let key = Key::new(format!("key{:06}_{:02}", i, batch).into_bytes());
1883 let mut item = HashMap::new();
1884 item.insert("value".to_string(), Value::number(i as i64));
1885 db.put(key, item).unwrap();
1886 }
1887 }
1888
1889 for i in 0..100 {
1891 let key = Key::new(format!("key{:03}", i).into_bytes());
1892 let result = db.get(&key).unwrap();
1893 assert!(result.is_none(), "Deleted key should not be found");
1894 }
1895
1896 for i in 100..200 {
1898 let key = Key::new(format!("key{:03}", i).into_bytes());
1899 let result = db.get(&key).unwrap();
1900 assert!(result.is_some(), "Non-deleted key should still exist");
1901 }
1902 }
1903}