kstone_core/
lsm.rs

1use crate::{Error, Result, Record, Key, Item, SeqNo, Value, wal::Wal, sst::{SstWriter, SstReader}};
2use crate::iterator::{QueryParams, QueryResult, ScanParams, ScanResult};
3use crate::expression::{UpdateAction, UpdateExecutor, ExpressionContext, Expr, ExpressionEvaluator};
4use crate::index::{TableSchema, encode_index_key, decode_index_key};
5use crate::compaction::{CompactionManager, CompactionConfig, CompactionStatsAtomic};
6use crate::config::DatabaseConfig;
7use bytes::Bytes;
8use parking_lot::RwLock;
9use std::collections::BTreeMap;
10use std::path::{Path, PathBuf};
11use std::sync::Arc;
12use std::fs;
13
14const MEMTABLE_THRESHOLD: usize = 1000; // Flush after 1000 records per stripe
15const NUM_STRIPES: usize = 256;
16
17/// LSM engine with 256-way striping (Phase 1.6+)
18pub struct LsmEngine {
19    inner: Arc<RwLock<LsmInner>>,
20}
21
22/// A single stripe in the LSM tree
23struct Stripe {
24    memtable: BTreeMap<Vec<u8>, Record>, // Sorted by encoded key
25    memtable_size_bytes: usize,          // Approximate size in bytes
26    ssts: Vec<SstReader>,                 // Newest first
27}
28
29impl Stripe {
30    fn new() -> Self {
31        Self {
32            memtable: BTreeMap::new(),
33            memtable_size_bytes: 0,
34            ssts: Vec::new(),
35        }
36    }
37
38    /// Estimate the size of a record in bytes
39    fn estimate_record_size(key_enc: &[u8], record: &Record) -> usize {
40        let mut size = key_enc.len(); // Key size
41        size += std::mem::size_of::<SeqNo>(); // Sequence number
42
43        // Estimate item size
44        if let Some(item) = &record.value {
45            for (attr_name, value) in item {
46                size += attr_name.len();
47                size += match value {
48                    Value::S(s) => s.len(),
49                    Value::N(n) => n.len(),
50                    Value::B(b) => b.len(),
51                    Value::Bool(_) => 1,
52                    Value::Null => 0,
53                    Value::Ts(_) => 8,
54                    Value::L(list) => {
55                        // Rough estimate for lists
56                        list.len() * 32 // Assume average 32 bytes per item
57                    }
58                    Value::M(map) => {
59                        // Rough estimate for maps
60                        map.len() * 64 // Assume average 64 bytes per entry
61                    }
62                    Value::VecF32(vec) => {
63                        // f32 vectors: 4 bytes per element
64                        vec.len() * 4
65                    }
66                };
67            }
68        }
69
70        size
71    }
72}
73
74struct LsmInner {
75    dir: PathBuf,
76    wal: Wal,
77    stripes: Vec<Stripe>,  // 256 stripes
78    next_seq: SeqNo,       // Global sequence number
79    next_sst_id: u64,      // Global SST ID counter
80    schema: TableSchema,   // Index definitions (Phase 3.1+)
81    stream_buffer: std::collections::VecDeque<crate::stream::StreamRecord>,  // Stream records (Phase 3.4+)
82    compaction_config: CompactionConfig,  // Compaction configuration (Phase 1.7+)
83    compaction_stats: CompactionStatsAtomic,  // Compaction statistics (Phase 1.7+)
84    config: DatabaseConfig,  // Database configuration (Phase 8+)
85}
86
87/// Transaction write operation (Phase 2.7+)
88#[derive(Debug, Clone)]
89pub enum TransactWriteOperation {
90    /// Put an item with optional condition
91    Put {
92        item: Item,
93        condition: Option<Expr>,
94    },
95    /// Update an item with optional condition
96    Update {
97        actions: Vec<UpdateAction>,
98        condition: Option<Expr>,
99    },
100    /// Delete an item with optional condition
101    Delete {
102        condition: Option<Expr>,
103    },
104    /// Condition check only (no write)
105    ConditionCheck {
106        condition: Expr,
107    },
108}
109
110impl TransactWriteOperation {
111    /// Get the condition expression if present
112    pub fn condition(&self) -> Option<&Expr> {
113        match self {
114            Self::Put { condition, .. } => condition.as_ref(),
115            Self::Update { condition, .. } => condition.as_ref(),
116            Self::Delete { condition } => condition.as_ref(),
117            Self::ConditionCheck { condition } => Some(condition),
118        }
119    }
120}
121
122impl LsmInner {
123    /// Check if a stripe needs to flush based on configured limits
124    fn should_flush_stripe(&self, stripe_id: usize) -> bool {
125        let stripe = &self.stripes[stripe_id];
126
127        // Check record count limit
128        if stripe.memtable.len() >= self.config.max_memtable_records {
129            return true;
130        }
131
132        // Check byte size limit if configured
133        if let Some(max_bytes) = self.config.max_memtable_size_bytes {
134            if stripe.memtable_size_bytes >= max_bytes {
135                return true;
136            }
137        }
138
139        false
140    }
141
142    /// Insert a record into a stripe's memtable, tracking size
143    fn insert_into_memtable(&mut self, stripe_id: usize, key_enc: Vec<u8>, record: Record) {
144        let record_size = Stripe::estimate_record_size(&key_enc, &record);
145
146        // If key already exists, subtract old size first
147        if let Some(old_record) = self.stripes[stripe_id].memtable.get(&key_enc) {
148            let old_size = Stripe::estimate_record_size(&key_enc, old_record);
149            self.stripes[stripe_id].memtable_size_bytes =
150                self.stripes[stripe_id].memtable_size_bytes.saturating_sub(old_size);
151        }
152
153        self.stripes[stripe_id].memtable.insert(key_enc, record);
154        self.stripes[stripe_id].memtable_size_bytes += record_size;
155    }
156}
157
158impl LsmEngine {
159    /// Create a new database
160    pub fn create(dir: impl AsRef<Path>) -> Result<Self> {
161        Self::create_with_schema(dir, TableSchema::new())
162    }
163
164    /// Create a new database with a table schema (Phase 3.1+)
165    pub fn create_with_schema(dir: impl AsRef<Path>, schema: TableSchema) -> Result<Self> {
166        Self::create_with_config(dir, DatabaseConfig::default(), schema)
167    }
168
169    /// Create a new database with custom configuration (Phase 8+)
170    pub fn create_with_config(
171        dir: impl AsRef<Path>,
172        config: DatabaseConfig,
173        schema: TableSchema,
174    ) -> Result<Self> {
175        // Validate configuration
176        config.validate().map_err(|e| Error::InvalidArgument(e))?;
177
178        let dir = dir.as_ref();
179        fs::create_dir_all(dir)?;
180
181        let wal_path = dir.join("wal.log");
182        if wal_path.exists() {
183            return Err(Error::AlreadyExists(dir.display().to_string()));
184        }
185
186        let wal = Wal::create(&wal_path)?;
187
188        // Initialize 256 stripes
189        let stripes = (0..NUM_STRIPES).map(|_| Stripe::new()).collect();
190
191        Ok(Self {
192            inner: Arc::new(RwLock::new(LsmInner {
193                dir: dir.to_path_buf(),
194                wal,
195                stripes,
196                next_seq: 1,
197                next_sst_id: 1,
198                schema,
199                stream_buffer: std::collections::VecDeque::new(),
200                compaction_config: CompactionConfig::default(),
201                compaction_stats: CompactionStatsAtomic::new(),
202                config,
203            })),
204        })
205    }
206
207    /// Open existing database
208    pub fn open(dir: impl AsRef<Path>) -> Result<Self> {
209        let dir = dir.as_ref();
210        let wal_path = dir.join("wal.log");
211
212        let wal = Wal::open(&wal_path)?;
213
214        // Initialize 256 stripes
215        let mut stripes: Vec<Stripe> = (0..NUM_STRIPES).map(|_| Stripe::new()).collect();
216        let mut max_sst_id = 0u64;
217
218        // Load existing SSTs into appropriate stripes
219        for entry in fs::read_dir(dir)? {
220            let entry = entry?;
221            let path = entry.path();
222            if let Some(ext) = path.extension() {
223                if ext == "sst" {
224                    if let Some(stem) = path.file_stem() {
225                        if let Some(name) = stem.to_str() {
226                            // Parse filename: {stripe:03}-{sst_id}.sst or legacy {sst_id}.sst
227                            if let Some((stripe_str, id_str)) = name.split_once('-') {
228                                // New format: stripe-id
229                                if let (Ok(stripe), Ok(id)) = (stripe_str.parse::<usize>(), id_str.parse::<u64>()) {
230                                    if stripe < NUM_STRIPES {
231                                        max_sst_id = max_sst_id.max(id);
232                                        let reader = SstReader::open(&path)?;
233                                        stripes[stripe].ssts.push(reader);
234                                    }
235                                }
236                            } else {
237                                // Legacy format: just id (assign to stripe 0)
238                                if let Ok(id) = name.parse::<u64>() {
239                                    max_sst_id = max_sst_id.max(id);
240                                    let reader = SstReader::open(&path)?;
241                                    stripes[0].ssts.push(reader);
242                                }
243                            }
244                        }
245                    }
246                }
247            }
248        }
249
250        // Sort SSTs within each stripe (newest first)
251        for stripe in &mut stripes {
252            stripe.ssts.reverse();
253        }
254
255        // Recover from WAL
256        let records = wal.read_all()?;
257        let mut max_seq = 0;
258
259        for (_lsn, record) in records {
260            max_seq = max_seq.max(record.seq);
261            let key_enc = record.key.encode().to_vec();
262            let stripe_id = record.key.stripe() as usize;
263            stripes[stripe_id].memtable.insert(key_enc, record);
264        }
265
266        Ok(Self {
267            inner: Arc::new(RwLock::new(LsmInner {
268                dir: dir.to_path_buf(),
269                wal,
270                stripes,
271                next_seq: max_seq + 1,
272                next_sst_id: max_sst_id + 1,
273                schema: TableSchema::new(), // TODO: Load from manifest in future
274                stream_buffer: std::collections::VecDeque::new(),
275                compaction_config: CompactionConfig::default(),
276                compaction_stats: CompactionStatsAtomic::new(),
277                config: DatabaseConfig::default(), // TODO: Load from manifest in future
278            })),
279        })
280    }
281
282    /// Put an item
283    pub fn put(&self, key: Key, item: Item) -> Result<()> {
284        let mut inner = self.inner.write();
285
286        // Check if item exists (for stream record) (Phase 3.4+)
287        let old_image = if inner.schema.stream_config.enabled {
288            let stripe_id = key.stripe() as usize;
289            let key_enc = key.encode().to_vec();
290            inner.stripes[stripe_id].memtable.get(&key_enc).and_then(|r| r.value.clone())
291        } else {
292            None
293        };
294
295        let seq = inner.next_seq;
296        inner.next_seq += 1;
297
298        let record = Record::put(key.clone(), item.clone(), seq);
299
300        // Write to WAL
301        inner.wal.append(record.clone())?;
302        inner.wal.flush()?;
303
304        // Route to correct stripe
305        let stripe_id = record.key.stripe() as usize;
306        let key_enc = record.key.encode().to_vec();
307        inner.insert_into_memtable(stripe_id, key_enc, record);
308
309        // Materialize LSI entries (Phase 3.1+)
310        if !inner.schema.local_indexes.is_empty() {
311            self.materialize_lsi_entries(&mut inner, &key, &item)?;
312        }
313
314        // Materialize GSI entries (Phase 3.2+)
315        if !inner.schema.global_indexes.is_empty() {
316            self.materialize_gsi_entries(&mut inner, &key, &item)?;
317        }
318
319        // Emit stream record (Phase 3.4+)
320        if inner.schema.stream_config.enabled {
321            let stream_record = if let Some(old) = old_image {
322                crate::stream::StreamRecord::modify(
323                    seq,
324                    key.clone(),
325                    old,
326                    item.clone(),
327                    inner.schema.stream_config.view_type,
328                )
329            } else {
330                crate::stream::StreamRecord::insert(
331                    seq,
332                    key.clone(),
333                    item.clone(),
334                    inner.schema.stream_config.view_type,
335                )
336            };
337            self.emit_stream_record(&mut inner, stream_record);
338        }
339
340        // Check if this stripe needs to flush
341        if inner.should_flush_stripe(stripe_id) {
342            self.flush_stripe(&mut inner, stripe_id)?;
343        }
344
345        Ok(())
346    }
347
348    /// Put an item with a condition expression (Phase 2.5+)
349    pub fn put_conditional(&self, key: Key, item: Item, condition: &Expr, context: &ExpressionContext) -> Result<()> {
350        // Get current item (if exists)
351        let current_item = self.get(&key)?.unwrap_or_else(|| std::collections::HashMap::new());
352
353        // Evaluate condition
354        let evaluator = ExpressionEvaluator::new(&current_item, context);
355        let condition_passed = evaluator.evaluate(condition)?;
356
357        if !condition_passed {
358            return Err(Error::ConditionalCheckFailed("Put condition failed".into()));
359        }
360
361        // Condition passed, proceed with put
362        self.put(key, item)
363    }
364
365    /// Get an item
366    pub fn get(&self, key: &Key) -> Result<Option<Item>> {
367        let inner = self.inner.read();
368
369        // Route to correct stripe
370        let stripe_id = key.stripe() as usize;
371        let stripe = &inner.stripes[stripe_id];
372        let key_enc = key.encode().to_vec();
373
374        // Check stripe's memtable first
375        if let Some(record) = stripe.memtable.get(&key_enc) {
376            if let Some(item) = &record.value {
377                // Check TTL (Phase 3.3+)
378                if inner.schema.is_expired(item) {
379                    // Item is expired - perform lazy deletion
380                    drop(inner); // Release read lock
381                    self.delete(key.clone())?;
382                    return Ok(None);
383                }
384            }
385            return Ok(record.value.clone());
386        }
387
388        // Check stripe's SSTs (newest to oldest)
389        for sst in &stripe.ssts {
390            if let Some(record) = sst.get(key) {
391                if let Some(item) = &record.value {
392                    // Check TTL (Phase 3.3+)
393                    if inner.schema.is_expired(item) {
394                        // Item is expired - perform lazy deletion
395                        drop(inner); // Release read lock
396                        self.delete(key.clone())?;
397                        return Ok(None);
398                    }
399                }
400                return Ok(record.value.clone());
401            }
402        }
403
404        Ok(None)
405    }
406
407    /// Delete an item
408    pub fn delete(&self, key: Key) -> Result<()> {
409        let mut inner = self.inner.write();
410
411        // Check if item exists (for stream record) (Phase 3.4+)
412        let old_image = if inner.schema.stream_config.enabled {
413            let stripe_id = key.stripe() as usize;
414            let key_enc = key.encode().to_vec();
415            inner.stripes[stripe_id].memtable.get(&key_enc).and_then(|r| r.value.clone())
416        } else {
417            None
418        };
419
420        let seq = inner.next_seq;
421        inner.next_seq += 1;
422
423        let record = Record::delete(key.clone(), seq);
424
425        // Write to WAL
426        inner.wal.append(record.clone())?;
427        inner.wal.flush()?;
428
429        // Route to correct stripe
430        let stripe_id = record.key.stripe() as usize;
431        let key_enc = record.key.encode().to_vec();
432        inner.stripes[stripe_id].memtable.insert(key_enc, record);
433
434        // Emit stream record (Phase 3.4+)
435        if inner.schema.stream_config.enabled {
436            if let Some(old) = old_image {
437                let stream_record = crate::stream::StreamRecord::remove(
438                    seq,
439                    key.clone(),
440                    old,
441                    inner.schema.stream_config.view_type,
442                );
443                self.emit_stream_record(&mut inner, stream_record);
444            }
445        }
446
447        // Check if this stripe needs to flush
448        if inner.should_flush_stripe(stripe_id) {
449            self.flush_stripe(&mut inner, stripe_id)?;
450        }
451
452        Ok(())
453    }
454
455    /// Delete an item with a condition expression (Phase 2.5+)
456    pub fn delete_conditional(&self, key: Key, condition: &Expr, context: &ExpressionContext) -> Result<()> {
457        // Get current item
458        let current_item = self.get(&key)?.unwrap_or_else(|| std::collections::HashMap::new());
459
460        // Evaluate condition
461        let evaluator = ExpressionEvaluator::new(&current_item, context);
462        let condition_passed = evaluator.evaluate(condition)?;
463
464        if !condition_passed {
465            return Err(Error::ConditionalCheckFailed("Delete condition failed".into()));
466        }
467
468        // Condition passed, proceed with delete
469        self.delete(key)
470    }
471
472    /// Update an item using update expression (Phase 2.4+)
473    pub fn update(&self, key: &Key, actions: &[UpdateAction], context: &ExpressionContext) -> Result<Item> {
474        // First, get the current item (or create empty if doesn't exist)
475        let current_item = self.get(key)?.unwrap_or_else(|| std::collections::HashMap::new());
476
477        // Execute update actions
478        let executor = UpdateExecutor::new(context);
479        let updated_item = executor.execute(&current_item, actions)?;
480
481        // Put the updated item
482        self.put(key.clone(), updated_item.clone())?;
483
484        Ok(updated_item)
485    }
486
487    /// Update an item with a condition expression (Phase 2.5+)
488    pub fn update_conditional(
489        &self,
490        key: &Key,
491        actions: &[UpdateAction],
492        condition: &Expr,
493        context: &ExpressionContext,
494    ) -> Result<Item> {
495        // Get current item (or create empty if doesn't exist)
496        let current_item = self.get(key)?.unwrap_or_else(|| std::collections::HashMap::new());
497
498        // Evaluate condition
499        let evaluator = ExpressionEvaluator::new(&current_item, context);
500        let condition_passed = evaluator.evaluate(condition)?;
501
502        if !condition_passed {
503            return Err(Error::ConditionalCheckFailed("Update condition failed".into()));
504        }
505
506        // Condition passed, execute update
507        let executor = UpdateExecutor::new(context);
508        let updated_item = executor.execute(&current_item, actions)?;
509
510        // Put the updated item
511        self.put(key.clone(), updated_item.clone())?;
512
513        Ok(updated_item)
514    }
515
516    /// Query items within a partition (Phase 2.1+)
517    pub fn query(&self, params: QueryParams) -> Result<QueryResult> {
518        let inner = self.inner.read();
519
520        // Route to correct stripe
521        let stripe_id = {
522            let temp_key = Key::new(params.pk.clone());
523            temp_key.stripe() as usize
524        };
525        let stripe = &inner.stripes[stripe_id];
526
527        let mut items = Vec::new();
528        let mut seen_keys: std::collections::HashSet<Vec<u8>> = std::collections::HashSet::new();
529        let mut scanned_count = 0;
530        let mut last_key = None;
531
532        // Collect all matching records from memtable and SSTs
533        // We need to merge them by key, taking the newest version (highest SeqNo)
534        let mut all_records: BTreeMap<Vec<u8>, Record> = BTreeMap::new();
535
536        // Check if this is an index query (Phase 3.1+)
537        let is_index_query = params.index_name.is_some();
538
539        // First, get records from memtable
540        for (key_enc, record) in &stripe.memtable {
541            if is_index_query {
542                // For index queries, check if this is an index key with matching index name and pk
543                if let Some(index_name) = &params.index_name {
544                    if let Some((idx_name, idx_pk, idx_sk)) = decode_index_key(key_enc) {
545                        // Check if index name matches
546                        if idx_name != *index_name {
547                            continue;
548                        }
549
550                        // Check if PK matches
551                        if idx_pk != params.pk {
552                            continue;
553                        }
554
555                        // Check index sort key condition
556                        if !params.matches_sk(&Some(idx_sk)) {
557                            continue;
558                        }
559
560                        all_records.insert(key_enc.clone(), record.clone());
561                    }
562                }
563            } else {
564                // Base table query
565                // Check if PK matches
566                if record.key.pk != params.pk {
567                    continue;
568                }
569
570                // Check sort key condition
571                if !params.matches_sk(&record.key.sk) {
572                    continue;
573                }
574
575                all_records.insert(key_enc.clone(), record.clone());
576            }
577        }
578
579        // Then, get records from SSTs (newer SSTs first)
580        for _sst in &stripe.ssts {
581            // TODO: Scan SST files for matching records
582            // For now, we only query from memtable
583            // SST scanning will be added when we implement SST iterators
584        }
585
586        // Convert to sorted vec based on direction
587        let mut sorted_records: Vec<(Vec<u8>, Record)> = all_records.into_iter().collect();
588
589        if !params.forward {
590            sorted_records.reverse();
591        }
592
593        // Apply pagination and limit
594        for (key_enc, record) in sorted_records {
595            // Skip based on pagination
596            if params.should_skip(&record.key) {
597                continue;
598            }
599
600            scanned_count += 1;
601
602            // Skip if we've already seen this key (newer version)
603            if seen_keys.contains(&key_enc) {
604                continue;
605            }
606            seen_keys.insert(key_enc);
607
608            // Skip tombstones
609            if record.value.is_none() {
610                continue;
611            }
612
613            // Check TTL and skip expired items (Phase 3.3+)
614            if let Some(ref item) = record.value {
615                if inner.schema.is_expired(item) {
616                    continue; // Skip expired items
617                }
618            }
619
620            last_key = Some(record.key.clone());
621
622            if let Some(item) = record.value {
623                items.push(item);
624
625                // Check limit
626                if let Some(limit) = params.limit {
627                    if items.len() >= limit {
628                        break;
629                    }
630                }
631            }
632        }
633
634        Ok(QueryResult::new(items, last_key, scanned_count))
635    }
636
637    /// Batch get multiple items (Phase 2.6+)
638    pub fn batch_get(&self, keys: &[Key]) -> Result<std::collections::HashMap<Key, Option<Item>>> {
639        let mut results = std::collections::HashMap::new();
640
641        for key in keys {
642            let item = self.get(key)?;
643            results.insert(key.clone(), item);
644        }
645
646        Ok(results)
647    }
648
649    /// Batch write multiple items (Phase 2.6+)
650    pub fn batch_write(&self, operations: &[(Key, Option<Item>)]) -> Result<usize> {
651        let mut processed = 0;
652
653        for (key, item_opt) in operations {
654            match item_opt {
655                Some(item) => {
656                    self.put(key.clone(), item.clone())?;
657                    processed += 1;
658                }
659                None => {
660                    self.delete(key.clone())?;
661                    processed += 1;
662                }
663            }
664        }
665
666        Ok(processed)
667    }
668
669    /// Transaction get - read multiple items atomically (Phase 2.7+)
670    pub fn transact_get(&self, keys: &[Key]) -> Result<Vec<Option<Item>>> {
671        // Hold read lock for consistent snapshot
672        let _inner = self.inner.read();
673
674        let mut items = Vec::new();
675        for key in keys {
676            let item = self.get(key)?;
677            items.push(item);
678        }
679
680        Ok(items)
681    }
682
683    /// Transaction write - write multiple items atomically with conditions (Phase 2.7+)
684    pub fn transact_write(
685        &self,
686        operations: &[(Key, TransactWriteOperation)],
687        context: &ExpressionContext,
688    ) -> Result<usize> {
689        // Acquire write lock for atomicity
690        let mut inner = self.inner.write();
691
692        // Phase 1: Read all items and check all conditions
693        let mut current_items: Vec<Option<Item>> = Vec::new();
694        for (key, op) in operations {
695            let item = {
696                let stripe_id = key.stripe() as usize;
697                let stripe = &inner.stripes[stripe_id];
698                let key_enc = key.encode().to_vec();
699
700                // Check memtable
701                if let Some(record) = stripe.memtable.get(&key_enc) {
702                    record.value.clone()
703                } else {
704                    // Check SSTs
705                    let mut found = None;
706                    for sst in &stripe.ssts {
707                        if let Some(record) = sst.get(key) {
708                            found = record.value.clone();
709                            break;
710                        }
711                    }
712                    found
713                }
714            };
715
716            current_items.push(item.clone());
717
718            // Check condition if present
719            if let Some(condition_expr) = op.condition() {
720                let current_item = item.unwrap_or_else(|| std::collections::HashMap::new());
721                let evaluator = ExpressionEvaluator::new(&current_item, context);
722                let condition_passed = evaluator.evaluate(condition_expr)?;
723
724                if !condition_passed {
725                    return Err(Error::TransactionCanceled(format!(
726                        "Condition failed for key {:?}",
727                        key
728                    )));
729                }
730            }
731        }
732
733        // Phase 2: All conditions passed, perform all writes
734        let mut committed = 0;
735        for (i, (key, op)) in operations.iter().enumerate() {
736            match op {
737                TransactWriteOperation::Put { item, .. } => {
738                    // Perform put (without going through public API to avoid nested locks)
739                    let seq = inner.next_seq;
740                    inner.next_seq += 1;
741                    let record = Record::put(key.clone(), item.clone(), seq);
742                    inner.wal.append(record.clone())?;
743                    inner.wal.flush()?;
744
745                    let stripe_id = record.key.stripe() as usize;
746                    let key_enc = record.key.encode().to_vec();
747                    inner.stripes[stripe_id].memtable.insert(key_enc, record);
748
749                    if inner.stripes[stripe_id].memtable.len() >= MEMTABLE_THRESHOLD {
750                        self.flush_stripe(&mut inner, stripe_id)?;
751                    }
752
753                    committed += 1;
754                }
755                TransactWriteOperation::Delete { .. } => {
756                    // Perform delete
757                    let seq = inner.next_seq;
758                    inner.next_seq += 1;
759                    let record = Record::delete(key.clone(), seq);
760                    inner.wal.append(record.clone())?;
761                    inner.wal.flush()?;
762
763                    let stripe_id = record.key.stripe() as usize;
764                    let key_enc = record.key.encode().to_vec();
765                    inner.stripes[stripe_id].memtable.insert(key_enc, record);
766
767                    if inner.stripes[stripe_id].memtable.len() >= MEMTABLE_THRESHOLD {
768                        self.flush_stripe(&mut inner, stripe_id)?;
769                    }
770
771                    committed += 1;
772                }
773                TransactWriteOperation::Update { actions, .. } => {
774                    // Perform update
775                    let current_item = current_items[i].clone().unwrap_or_else(|| std::collections::HashMap::new());
776                    let executor = UpdateExecutor::new(context);
777                    let updated_item = executor.execute(&current_item, actions)?;
778
779                    let seq = inner.next_seq;
780                    inner.next_seq += 1;
781                    let record = Record::put(key.clone(), updated_item, seq);
782                    inner.wal.append(record.clone())?;
783                    inner.wal.flush()?;
784
785                    let stripe_id = record.key.stripe() as usize;
786                    let key_enc = record.key.encode().to_vec();
787                    inner.stripes[stripe_id].memtable.insert(key_enc, record);
788
789                    if inner.stripes[stripe_id].memtable.len() >= MEMTABLE_THRESHOLD {
790                        self.flush_stripe(&mut inner, stripe_id)?;
791                    }
792
793                    committed += 1;
794                }
795                TransactWriteOperation::ConditionCheck { .. } => {
796                    // Condition already checked in phase 1, no write needed
797                    committed += 1;
798                }
799            }
800        }
801
802        Ok(committed)
803    }
804
805    /// Scan all items across all stripes (Phase 2.2+)
806    pub fn scan(&self, params: ScanParams) -> Result<ScanResult> {
807        let inner = self.inner.read();
808
809        // Collect all records from all stripes first, then sort globally
810        let mut all_records: BTreeMap<Vec<u8>, Record> = BTreeMap::new();
811
812        // Scan all stripes (or subset for parallel scans)
813        for stripe_id in 0..NUM_STRIPES {
814            // Skip stripes not assigned to this segment
815            if !params.should_scan_stripe(stripe_id) {
816                continue;
817            }
818
819            let stripe = &inner.stripes[stripe_id];
820
821            // Collect from stripe's memtable
822            for (key_enc, record) in &stripe.memtable {
823                // Skip tombstones
824                if record.value.is_none() {
825                    continue;
826                }
827
828                all_records.insert(key_enc.clone(), record.clone());
829            }
830
831            // TODO: Scan stripe's SSTs
832            // Will be added when we implement SST iterators
833        }
834
835        // Now apply pagination and limit on sorted records
836        let mut items = Vec::new();
837        let mut scanned_count = 0;
838        let mut last_key = None;
839
840        for (_, record) in all_records {
841            // Skip based on pagination
842            if params.should_skip(&record.key) {
843                continue;
844            }
845
846            scanned_count += 1;
847
848            // Check TTL and skip expired items (Phase 3.3+)
849            if let Some(ref item) = record.value {
850                if inner.schema.is_expired(item) {
851                    continue; // Skip expired items
852                }
853            }
854
855            last_key = Some(record.key.clone());
856
857            if let Some(item) = record.value {
858                items.push(item);
859
860                // Check limit
861                if let Some(limit) = params.limit {
862                    if items.len() >= limit {
863                        return Ok(ScanResult::new(items, last_key, scanned_count));
864                    }
865                }
866            }
867        }
868
869        Ok(ScanResult::new(items, last_key, scanned_count))
870    }
871
872    /// Materialize LSI entries for an item (Phase 3.1+)
873    fn materialize_lsi_entries(&self, inner: &mut LsmInner, key: &Key, item: &Item) -> Result<()> {
874        // For each LSI defined in the schema
875        for lsi in &inner.schema.local_indexes {
876            // Extract the index sort key value from the item
877            if let Some(index_sk_value) = item.get(&lsi.sort_key_attribute) {
878                // Convert Value to Bytes for the index key
879                let index_sk_bytes = match index_sk_value {
880                    Value::S(s) => Bytes::copy_from_slice(s.as_bytes()),
881                    Value::N(n) => Bytes::copy_from_slice(n.as_bytes()),
882                    Value::B(b) => b.clone(),
883                    Value::Bool(b) => Bytes::copy_from_slice(if *b { b"true" } else { b"false" }),
884                    Value::Ts(ts) => Bytes::copy_from_slice(&ts.to_le_bytes()),
885                    _ => continue, // Skip unsupported types for now
886                };
887
888                // Create index key
889                let index_key_encoded = encode_index_key(&lsi.name, &key.pk, &index_sk_bytes);
890
891                // Create index record with the full item (or projected attributes based on projection type)
892                let index_item = item.clone(); // For now, always store full item
893
894                // Create a synthetic Key from the encoded bytes
895                // Index records use the base table's PK + encoded index info
896                let index_key = Key::new(Bytes::copy_from_slice(&index_key_encoded));
897
898                let seq = inner.next_seq;
899                inner.next_seq += 1;
900
901                let index_record = Record::put(index_key, index_item, seq);
902
903                // Write index record to WAL
904                inner.wal.append(index_record.clone())?;
905
906                // Add to memtable (route to same stripe as base record for locality)
907                let stripe_id = key.stripe() as usize;
908                inner.stripes[stripe_id].memtable.insert(index_key_encoded, index_record);
909            }
910        }
911
912        Ok(())
913    }
914
915    /// Materialize GSI entries for an item (Phase 3.2+)
916    fn materialize_gsi_entries(&self, inner: &mut LsmInner, base_key: &Key, item: &Item) -> Result<()> {
917        // For each GSI defined in the schema
918        for gsi in &inner.schema.global_indexes {
919            // Extract the GSI partition key value from the item
920            if let Some(gsi_pk_value) = item.get(&gsi.partition_key_attribute) {
921                // Convert Value to Bytes for the GSI partition key
922                let gsi_pk_bytes = match gsi_pk_value {
923                    Value::S(s) => Bytes::copy_from_slice(s.as_bytes()),
924                    Value::N(n) => Bytes::copy_from_slice(n.as_bytes()),
925                    Value::B(b) => b.clone(),
926                    Value::Bool(b) => Bytes::copy_from_slice(if *b { b"true" } else { b"false" }),
927                    Value::Ts(ts) => Bytes::copy_from_slice(&ts.to_le_bytes()),
928                    _ => continue, // Skip unsupported types
929                };
930
931                // Extract the GSI sort key value (if defined)
932                let mut gsi_sk_bytes = if let Some(gsi_sk_attr) = &gsi.sort_key_attribute {
933                    if let Some(gsi_sk_value) = item.get(gsi_sk_attr) {
934                        match gsi_sk_value {
935                            Value::S(s) => Bytes::copy_from_slice(s.as_bytes()),
936                            Value::N(n) => Bytes::copy_from_slice(n.as_bytes()),
937                            Value::B(b) => b.clone(),
938                            Value::Bool(b) => Bytes::copy_from_slice(if *b { b"true" } else { b"false" }),
939                            Value::Ts(ts) => Bytes::copy_from_slice(&ts.to_le_bytes()),
940                            _ => Bytes::new(), // Use empty bytes for unsupported types
941                        }
942                    } else {
943                        continue; // Skip if sort key attribute doesn't exist
944                    }
945                } else {
946                    Bytes::new() // No sort key for this GSI
947                };
948
949                // For GSI, append base table PK to ensure uniqueness
950                // This allows multiple base items with same GSI PK+SK
951                let base_pk_encoded = base_key.encode();
952                let mut combined_sk = Vec::with_capacity(gsi_sk_bytes.len() + base_pk_encoded.len());
953                combined_sk.extend_from_slice(&gsi_sk_bytes);
954                combined_sk.extend_from_slice(&base_pk_encoded);
955                gsi_sk_bytes = Bytes::from(combined_sk);
956
957                // Create GSI index key
958                let index_key_encoded = encode_index_key(&gsi.name, &gsi_pk_bytes, &gsi_sk_bytes);
959
960                // Create index record with the full item
961                let index_item = item.clone(); // For now, always store full item
962
963                // Create a synthetic Key from the encoded bytes
964                let index_key = Key::new(Bytes::copy_from_slice(&index_key_encoded));
965
966                let seq = inner.next_seq;
967                inner.next_seq += 1;
968
969                let index_record = Record::put(index_key, index_item, seq);
970
971                // Write index record to WAL
972                inner.wal.append(index_record.clone())?;
973
974                // Add to memtable - IMPORTANT: route to stripe based on GSI PK value
975                // Use a temporary key with just the GSI PK to determine stripe
976                let gsi_stripe_key = Key::new(gsi_pk_bytes.clone());
977                let gsi_stripe_id = gsi_stripe_key.stripe() as usize;
978                inner.stripes[gsi_stripe_id].memtable.insert(index_key_encoded, index_record);
979            }
980        }
981
982        Ok(())
983    }
984
985    /// Read stream records (Phase 3.4+)
986    ///
987    /// Returns all stream records in the buffer, ordered by sequence number (oldest first).
988    /// Optionally filter to only records with sequence number > after_sequence_number.
989    pub fn read_stream(&self, after_sequence_number: Option<u64>) -> Result<Vec<crate::stream::StreamRecord>> {
990        let inner = self.inner.read();
991
992        if !inner.schema.stream_config.enabled {
993            return Ok(Vec::new());
994        }
995
996        let records: Vec<crate::stream::StreamRecord> = inner.stream_buffer
997            .iter()
998            .filter(|record| {
999                if let Some(after) = after_sequence_number {
1000                    record.sequence_number > after
1001                } else {
1002                    true
1003                }
1004            })
1005            .cloned()
1006            .collect();
1007
1008        Ok(records)
1009    }
1010
1011    /// Emit a stream record if streams are enabled (Phase 3.4+)
1012    fn emit_stream_record(&self, inner: &mut LsmInner, record: crate::stream::StreamRecord) {
1013        if !inner.schema.stream_config.enabled {
1014            return;
1015        }
1016
1017        // Add to buffer
1018        inner.stream_buffer.push_back(record);
1019
1020        // Trim buffer if it exceeds max size
1021        while inner.stream_buffer.len() > inner.schema.stream_config.buffer_size {
1022            inner.stream_buffer.pop_front();
1023        }
1024    }
1025
1026    /// Flush a specific stripe's memtable to SST
1027    fn flush_stripe(&self, inner: &mut LsmInner, stripe_id: usize) -> Result<()> {
1028        if inner.stripes[stripe_id].memtable.is_empty() {
1029            return Ok(());
1030        }
1031
1032        let sst_id = inner.next_sst_id;
1033        inner.next_sst_id += 1;
1034
1035        // Filename format: {stripe:03}-{sst_id}.sst
1036        let sst_path = inner.dir.join(format!("{:03}-{}.sst", stripe_id, sst_id));
1037
1038        // Write SST from stripe's memtable
1039        let mut writer = SstWriter::new();
1040        for record in inner.stripes[stripe_id].memtable.values() {
1041            writer.add(record.clone());
1042        }
1043        writer.finish(&sst_path)?;
1044
1045        // Load the new SST
1046        let reader = SstReader::open(&sst_path)?;
1047
1048        // Add to front (newest SST) of this stripe
1049        inner.stripes[stripe_id].ssts.insert(0, reader);
1050
1051        // Clear stripe's memtable
1052        inner.stripes[stripe_id].memtable.clear();
1053        inner.stripes[stripe_id].memtable_size_bytes = 0;
1054
1055        // Check if compaction is needed for this stripe (Phase 1.7+)
1056        if inner.compaction_config.enabled && inner.stripes[stripe_id].ssts.len() >= inner.compaction_config.sst_threshold {
1057            // Start compaction statistics tracking
1058            let _guard = inner.compaction_stats.start_compaction();
1059
1060            let compaction_mgr = CompactionManager::new(stripe_id, inner.dir.clone());
1061            let ssts_to_compact = &inner.stripes[stripe_id].ssts;
1062            let sst_count = ssts_to_compact.len();
1063
1064            // Allocate new SST ID for compacted file
1065            let compacted_sst_id = inner.next_sst_id;
1066            inner.next_sst_id += 1;
1067
1068            // Perform compaction
1069            let (new_sst, old_paths) = compaction_mgr.compact(ssts_to_compact, compacted_sst_id)?;
1070
1071            // Record statistics
1072            inner.compaction_stats.record_ssts_merged(sst_count as u64);
1073            inner.compaction_stats.record_ssts_created(1);
1074
1075            // Replace all SSTs with the compacted one
1076            inner.stripes[stripe_id].ssts.clear();
1077            inner.stripes[stripe_id].ssts.push(new_sst);
1078
1079            // Delete old SST files
1080            compaction_mgr.cleanup_old_ssts(old_paths)?;
1081        }
1082
1083        Ok(())
1084    }
1085
1086    /// Force flush all stripes (for testing/shutdown)
1087    pub fn flush(&self) -> Result<()> {
1088        let mut inner = self.inner.write();
1089
1090        // Flush all non-empty stripes
1091        for stripe_id in 0..NUM_STRIPES {
1092            if !inner.stripes[stripe_id].memtable.is_empty() {
1093                self.flush_stripe(&mut inner, stripe_id)?;
1094            }
1095        }
1096
1097        Ok(())
1098    }
1099
1100    /// Set compaction configuration (Phase 1.7+)
1101    ///
1102    /// # Examples
1103    ///
1104    /// ```no_run
1105    /// use kstone_core::{LsmEngine, CompactionConfig};
1106    /// use tempfile::TempDir;
1107    ///
1108    /// let dir = TempDir::new().unwrap();
1109    /// let db = LsmEngine::create(dir.path()).unwrap();
1110    ///
1111    /// // Disable compaction
1112    /// db.set_compaction_config(CompactionConfig::disabled());
1113    ///
1114    /// // Or customize compaction settings
1115    /// let config = CompactionConfig::new()
1116    ///     .with_sst_threshold(5)
1117    ///     .with_check_interval(30);
1118    /// db.set_compaction_config(config);
1119    /// ```
1120    pub fn set_compaction_config(&self, config: CompactionConfig) {
1121        let mut inner = self.inner.write();
1122        inner.compaction_config = config;
1123    }
1124
1125    /// Get current compaction configuration (Phase 1.7+)
1126    pub fn compaction_config(&self) -> CompactionConfig {
1127        let inner = self.inner.read();
1128        inner.compaction_config.clone()
1129    }
1130
1131    /// Get compaction statistics (Phase 1.7+)
1132    ///
1133    /// Returns a snapshot of current compaction statistics including:
1134    /// - Total number of compactions performed
1135    /// - SSTs merged and created
1136    /// - Bytes read, written, and reclaimed
1137    /// - Records deduplicated and tombstones removed
1138    ///
1139    /// # Examples
1140    ///
1141    /// ```no_run
1142    /// use kstone_core::LsmEngine;
1143    /// use tempfile::TempDir;
1144    ///
1145    /// let dir = TempDir::new().unwrap();
1146    /// let db = LsmEngine::create(dir.path()).unwrap();
1147    ///
1148    /// // Get compaction statistics
1149    /// let stats = db.compaction_stats();
1150    /// println!("Total compactions: {}", stats.total_compactions);
1151    /// println!("SSTs merged: {}", stats.total_ssts_merged);
1152    /// println!("Bytes reclaimed: {}", stats.total_bytes_reclaimed);
1153    /// ```
1154    pub fn compaction_stats(&self) -> crate::compaction::CompactionStats {
1155        let inner = self.inner.read();
1156        inner.compaction_stats.snapshot()
1157    }
1158
1159    /// Trigger manual compaction on a specific stripe (Phase 1.7+)
1160    ///
1161    /// This is primarily for testing or manual database maintenance.
1162    /// Compaction will only occur if the stripe has enough SSTs.
1163    pub fn trigger_compaction(&self, stripe_id: usize) -> Result<()> {
1164        if stripe_id >= NUM_STRIPES {
1165            return Err(Error::InvalidArgument(format!(
1166                "Invalid stripe_id: {}, must be < {}",
1167                stripe_id, NUM_STRIPES
1168            )));
1169        }
1170
1171        let mut inner = self.inner.write();
1172
1173        // Check if compaction is needed
1174        if inner.stripes[stripe_id].ssts.len() >= inner.compaction_config.sst_threshold {
1175            let _guard = inner.compaction_stats.start_compaction();
1176            let compaction_mgr = CompactionManager::new(stripe_id, inner.dir.clone());
1177
1178            let sst_count = inner.stripes[stripe_id].ssts.len();
1179            let compacted_sst_id = inner.next_sst_id;
1180            inner.next_sst_id += 1;
1181
1182            let ssts_to_compact = &inner.stripes[stripe_id].ssts;
1183            let (new_sst, old_paths) = compaction_mgr.compact(ssts_to_compact, compacted_sst_id)?;
1184
1185            inner.compaction_stats.record_ssts_merged(sst_count as u64);
1186            inner.compaction_stats.record_ssts_created(1);
1187
1188            inner.stripes[stripe_id].ssts.clear();
1189            inner.stripes[stripe_id].ssts.push(new_sst);
1190
1191            compaction_mgr.cleanup_old_ssts(old_paths)?;
1192        }
1193
1194        Ok(())
1195    }
1196}
1197
1198#[cfg(test)]
1199mod tests {
1200    use super::*;
1201    use crate::Value;
1202    use tempfile::TempDir;
1203    use std::collections::HashMap;
1204
1205    #[test]
1206    fn test_lsm_create() {
1207        let dir = TempDir::new().unwrap();
1208        let _db = LsmEngine::create(dir.path()).unwrap();
1209    }
1210
1211    #[test]
1212    fn test_lsm_put_get() {
1213        let dir = TempDir::new().unwrap();
1214        let db = LsmEngine::create(dir.path()).unwrap();
1215
1216        let key = Key::new(b"user#123".to_vec());
1217        let mut item = HashMap::new();
1218        item.insert("name".to_string(), Value::string("Alice"));
1219        item.insert("age".to_string(), Value::number(30));
1220
1221        db.put(key.clone(), item.clone()).unwrap();
1222
1223        let result = db.get(&key).unwrap();
1224        assert_eq!(result, Some(item));
1225    }
1226
1227    #[test]
1228    fn test_lsm_delete() {
1229        let dir = TempDir::new().unwrap();
1230        let db = LsmEngine::create(dir.path()).unwrap();
1231
1232        let key = Key::new(b"user#123".to_vec());
1233        let mut item = HashMap::new();
1234        item.insert("name".to_string(), Value::string("Bob"));
1235
1236        db.put(key.clone(), item).unwrap();
1237        assert!(db.get(&key).unwrap().is_some());
1238
1239        db.delete(key.clone()).unwrap();
1240        assert!(db.get(&key).unwrap().is_none());
1241    }
1242
1243    #[test]
1244    fn test_lsm_reopen() {
1245        let dir = TempDir::new().unwrap();
1246        let path = dir.path().to_path_buf();
1247
1248        let key = Key::new(b"persistent".to_vec());
1249        let mut item = HashMap::new();
1250        item.insert("data".to_string(), Value::string("test"));
1251
1252        // Write and close
1253        {
1254            let db = LsmEngine::create(&path).unwrap();
1255            db.put(key.clone(), item.clone()).unwrap();
1256        }
1257
1258        // Reopen and verify
1259        let db = LsmEngine::open(&path).unwrap();
1260        let result = db.get(&key).unwrap();
1261        assert_eq!(result, Some(item));
1262    }
1263
1264    #[test]
1265    fn test_lsm_flush() {
1266        let dir = TempDir::new().unwrap();
1267        let db = LsmEngine::create(dir.path()).unwrap();
1268
1269        // Write many items to trigger flush
1270        for i in 0..MEMTABLE_THRESHOLD + 10 {
1271            let key = Key::new(format!("key{}", i).into_bytes());
1272            let mut item = HashMap::new();
1273            item.insert("value".to_string(), Value::number(i));
1274            db.put(key, item).unwrap();
1275        }
1276
1277        // Verify all items are readable
1278        for i in 0..MEMTABLE_THRESHOLD + 10 {
1279            let key = Key::new(format!("key{}", i).into_bytes());
1280            let result = db.get(&key).unwrap();
1281            assert!(result.is_some());
1282        }
1283    }
1284
1285    #[test]
1286    fn test_lsm_overwrite() {
1287        let dir = TempDir::new().unwrap();
1288        let db = LsmEngine::create(dir.path()).unwrap();
1289
1290        let key = Key::new(b"counter".to_vec());
1291
1292        for i in 0..5 {
1293            let mut item = HashMap::new();
1294            item.insert("value".to_string(), Value::number(i));
1295            db.put(key.clone(), item).unwrap();
1296        }
1297
1298        let result = db.get(&key).unwrap().unwrap();
1299        match result.get("value").unwrap() {
1300            Value::N(n) => assert_eq!(n, "4"),
1301            _ => panic!("Expected number value"),
1302        }
1303    }
1304
1305    #[test]
1306    fn test_lsm_striping() {
1307        let dir = TempDir::new().unwrap();
1308        let db = LsmEngine::create(dir.path()).unwrap();
1309
1310        // Insert keys that should go to different stripes
1311        let mut keys_by_stripe: HashMap<u8, Vec<Key>> = HashMap::new();
1312
1313        for i in 0..1000 {
1314            let key = Key::new(format!("key{}", i).into_bytes());
1315            let stripe = key.stripe();
1316
1317            let mut item = HashMap::new();
1318            item.insert("id".to_string(), Value::number(i));
1319            db.put(key.clone(), item).unwrap();
1320
1321            keys_by_stripe.entry(stripe).or_insert_with(Vec::new).push(key);
1322        }
1323
1324        // Verify multiple stripes were used
1325        assert!(keys_by_stripe.len() > 1, "Expected keys to be distributed across multiple stripes");
1326
1327        // Verify all keys are readable
1328        for (stripe, keys) in keys_by_stripe {
1329            for key in keys {
1330                let result = db.get(&key).unwrap();
1331                assert!(result.is_some(), "Key should exist in stripe {}", stripe);
1332            }
1333        }
1334    }
1335
1336    #[test]
1337    fn test_lsm_stripe_independent_flush() {
1338        let dir = TempDir::new().unwrap();
1339        let db = LsmEngine::create(dir.path()).unwrap();
1340
1341        // Use composite keys with same PK but different SK - all go to same stripe
1342        let pk = b"user#123";
1343        let base_key = Key::new(pk.to_vec());
1344        let stripe = base_key.stripe();
1345
1346        // Fill this stripe's memtable using composite keys with sort keys
1347        for i in 0..MEMTABLE_THRESHOLD + 10 {
1348            let key = Key::with_sk(pk.to_vec(), format!("item#{}", i).into_bytes());
1349            // Verify same stripe (stripe is based on PK only)
1350            assert_eq!(key.stripe(), stripe);
1351
1352            let mut item = HashMap::new();
1353            item.insert("value".to_string(), Value::number(i));
1354            db.put(key, item).unwrap();
1355        }
1356
1357        // Force flush
1358        db.flush().unwrap();
1359
1360        // Check that SST file exists with stripe prefix
1361        let mut found_striped_sst = false;
1362        for entry in fs::read_dir(dir.path()).unwrap() {
1363            let entry = entry.unwrap();
1364            if let Some(name) = entry.file_name().to_str() {
1365                if name.ends_with(".sst") && name.starts_with(&format!("{:03}-", stripe)) {
1366                    found_striped_sst = true;
1367                    break;
1368                }
1369            }
1370        }
1371
1372        assert!(found_striped_sst, "Expected SST file with stripe prefix");
1373    }
1374
1375    #[test]
1376    fn test_lsm_query_basic() {
1377        use crate::iterator::QueryParams;
1378        use bytes::Bytes;
1379
1380        let dir = TempDir::new().unwrap();
1381        let db = LsmEngine::create(dir.path()).unwrap();
1382
1383        let pk = b"user#123";
1384
1385        // Insert items with different sort keys
1386        for i in 0..10 {
1387            let key = Key::with_sk(pk.to_vec(), format!("item#{:03}", i).into_bytes());
1388            let mut item = HashMap::new();
1389            item.insert("id".to_string(), Value::number(i));
1390            item.insert("name".to_string(), Value::string(format!("Item {}", i)));
1391            db.put(key, item).unwrap();
1392        }
1393
1394        // Query all items in partition
1395        let params = QueryParams::new(Bytes::from(pk.to_vec()));
1396        let result = db.query(params).unwrap();
1397
1398        assert_eq!(result.items.len(), 10);
1399        assert_eq!(result.scanned_count, 10);
1400    }
1401
1402    #[test]
1403    fn test_lsm_query_with_limit() {
1404        use crate::iterator::QueryParams;
1405        use bytes::Bytes;
1406
1407        let dir = TempDir::new().unwrap();
1408        let db = LsmEngine::create(dir.path()).unwrap();
1409
1410        let pk = b"user#456";
1411
1412        // Insert 20 items
1413        for i in 0..20 {
1414            let key = Key::with_sk(pk.to_vec(), format!("item#{:03}", i).into_bytes());
1415            let mut item = HashMap::new();
1416            item.insert("id".to_string(), Value::number(i));
1417            db.put(key, item).unwrap();
1418        }
1419
1420        // Query with limit of 5
1421        let params = QueryParams::new(Bytes::from(pk.to_vec())).with_limit(5);
1422        let result = db.query(params).unwrap();
1423
1424        assert_eq!(result.items.len(), 5);
1425        assert!(result.last_key.is_some());
1426    }
1427
1428    #[test]
1429    fn test_lsm_query_with_sk_condition() {
1430        use crate::iterator::{QueryParams, SortKeyCondition};
1431        use bytes::Bytes;
1432
1433        let dir = TempDir::new().unwrap();
1434        let db = LsmEngine::create(dir.path()).unwrap();
1435
1436        let pk = b"user#789";
1437
1438        // Insert items
1439        for i in 0..10 {
1440            let key = Key::with_sk(pk.to_vec(), format!("item#{:03}", i).into_bytes());
1441            let mut item = HashMap::new();
1442            item.insert("id".to_string(), Value::number(i));
1443            db.put(key, item).unwrap();
1444        }
1445
1446        // Query with SK begins_with condition
1447        let params = QueryParams::new(Bytes::from(pk.to_vec()))
1448            .with_sk_condition(SortKeyCondition::BeginsWith, Bytes::from("item#00"), None);
1449        let result = db.query(params).unwrap();
1450
1451        // Should match item#000 through item#009
1452        assert_eq!(result.items.len(), 10);
1453    }
1454
1455    #[test]
1456    fn test_lsm_query_reverse() {
1457        use crate::iterator::QueryParams;
1458        use bytes::Bytes;
1459
1460        let dir = TempDir::new().unwrap();
1461        let db = LsmEngine::create(dir.path()).unwrap();
1462
1463        let pk = b"user#999";
1464
1465        // Insert items
1466        for i in 0..5 {
1467            let key = Key::with_sk(pk.to_vec(), format!("item#{}", i).into_bytes());
1468            let mut item = HashMap::new();
1469            item.insert("id".to_string(), Value::number(i));
1470            db.put(key, item).unwrap();
1471        }
1472
1473        // Query in reverse order
1474        let params = QueryParams::new(Bytes::from(pk.to_vec())).with_direction(false);
1475        let result = db.query(params).unwrap();
1476
1477        assert_eq!(result.items.len(), 5);
1478        // First item should have highest ID when reversed
1479        if let Some(Value::N(n)) = result.items[0].get("id") {
1480            assert_eq!(n, "4");
1481        } else {
1482            panic!("Expected number value");
1483        }
1484    }
1485
1486    #[test]
1487    fn test_lsm_scan_basic() {
1488        use crate::iterator::ScanParams;
1489
1490        let dir = TempDir::new().unwrap();
1491        let db = LsmEngine::create(dir.path()).unwrap();
1492
1493        // Insert items across multiple partitions
1494        for i in 0..20 {
1495            let pk = format!("user#{}", i);
1496            let key = Key::new(pk.into_bytes());
1497            let mut item = HashMap::new();
1498            item.insert("id".to_string(), Value::number(i));
1499            db.put(key, item).unwrap();
1500        }
1501
1502        // Scan all items
1503        let params = ScanParams::new();
1504        let result = db.scan(params).unwrap();
1505
1506        assert_eq!(result.items.len(), 20);
1507        assert_eq!(result.scanned_count, 20);
1508    }
1509
1510    #[test]
1511    fn test_lsm_scan_with_limit() {
1512        use crate::iterator::ScanParams;
1513
1514        let dir = TempDir::new().unwrap();
1515        let db = LsmEngine::create(dir.path()).unwrap();
1516
1517        // Insert 50 items
1518        for i in 0..50 {
1519            let pk = format!("item#{:03}", i);
1520            let key = Key::new(pk.into_bytes());
1521            let mut item = HashMap::new();
1522            item.insert("value".to_string(), Value::number(i));
1523            db.put(key, item).unwrap();
1524        }
1525
1526        // Scan with limit
1527        let params = ScanParams::new().with_limit(10);
1528        let result = db.scan(params).unwrap();
1529
1530        assert_eq!(result.items.len(), 10);
1531        assert!(result.last_key.is_some());
1532    }
1533
1534    #[test]
1535    fn test_lsm_scan_parallel() {
1536        use crate::iterator::ScanParams;
1537
1538        let dir = TempDir::new().unwrap();
1539        let db = LsmEngine::create(dir.path()).unwrap();
1540
1541        // Insert items that will distribute across stripes
1542        for i in 0..100 {
1543            let pk = format!("key{}", i);
1544            let key = Key::new(pk.into_bytes());
1545            let mut item = HashMap::new();
1546            item.insert("value".to_string(), Value::number(i));
1547            db.put(key, item).unwrap();
1548        }
1549
1550        // Parallel scan with 4 segments
1551        let mut total_items = 0;
1552        for segment in 0..4 {
1553            let params = ScanParams::new().with_segment(segment, 4);
1554            let result = db.scan(params).unwrap();
1555            total_items += result.items.len();
1556        }
1557
1558        // All segments together should return all items
1559        assert_eq!(total_items, 100);
1560    }
1561
1562    #[test]
1563    fn test_lsm_scan_pagination() {
1564        use crate::iterator::ScanParams;
1565
1566        let dir = TempDir::new().unwrap();
1567        let db = LsmEngine::create(dir.path()).unwrap();
1568
1569        // Insert 30 items
1570        for i in 0..30 {
1571            let pk = format!("user#{:03}", i);
1572            let key = Key::new(pk.into_bytes());
1573            let mut item = HashMap::new();
1574            item.insert("id".to_string(), Value::number(i));
1575            db.put(key, item).unwrap();
1576        }
1577
1578        // First page
1579        let params1 = ScanParams::new().with_limit(10);
1580        let result1 = db.scan(params1).unwrap();
1581        assert_eq!(result1.items.len(), 10);
1582        assert!(result1.last_key.is_some());
1583
1584        // Second page
1585        let params2 = ScanParams::new()
1586            .with_limit(10)
1587            .with_start_key(result1.last_key.unwrap());
1588        let result2 = db.scan(params2).unwrap();
1589        assert_eq!(result2.items.len(), 10);
1590
1591        // Third page
1592        let params3 = ScanParams::new()
1593            .with_limit(10)
1594            .with_start_key(result2.last_key.unwrap());
1595        let result3 = db.scan(params3).unwrap();
1596        assert_eq!(result3.items.len(), 10);
1597    }
1598
1599    #[test]
1600    fn test_lsm_compaction_triggered() {
1601        use crate::compaction::COMPACTION_THRESHOLD;
1602
1603        let dir = TempDir::new().unwrap();
1604        let db = LsmEngine::create(dir.path()).unwrap();
1605
1606        // Force multiple flushes to trigger compaction
1607        // Each flush creates 1 SST, so we need COMPACTION_THRESHOLD flushes
1608        for batch in 0..COMPACTION_THRESHOLD {
1609            // Insert MEMTABLE_THRESHOLD records to trigger flush
1610            for i in 0..MEMTABLE_THRESHOLD {
1611                let key = Key::new(format!("batch{:02}_key{:04}", batch, i).into_bytes());
1612                let mut item = HashMap::new();
1613                item.insert("batch".to_string(), Value::number(batch as i64));
1614                item.insert("seq".to_string(), Value::number(i as i64));
1615                db.put(key, item).unwrap();
1616            }
1617
1618            // Verify flush happened (memtable should be empty after auto-flush)
1619        }
1620
1621        // Trigger one more flush to initiate compaction
1622        for i in 0..MEMTABLE_THRESHOLD {
1623            let key = Key::new(format!("final_key{:04}", i).into_bytes());
1624            let mut item = HashMap::new();
1625            item.insert("final".to_string(), Value::number(1));
1626            db.put(key, item).unwrap();
1627        }
1628
1629        // Verify data integrity after compaction
1630        // Read a key from the first batch
1631        let key1 = Key::new(b"batch00_key0000".to_vec());
1632        let result1 = db.get(&key1).unwrap();
1633        assert!(result1.is_some());
1634        let item1 = result1.unwrap();
1635        assert_eq!(item1.get("batch").unwrap(), &Value::N("0".to_string()));
1636
1637        // Read a key from the last batch
1638        let key2 = Key::new(b"final_key0000".to_vec());
1639        let result2 = db.get(&key2).unwrap();
1640        assert!(result2.is_some());
1641
1642        // Verify that compaction happened by checking SST count is reduced
1643        // After compaction, there should be fewer SSTs than COMPACTION_THRESHOLD
1644        // (This is implicit - if compaction didn't work, reads would still work but SST count would be high)
1645    }
1646
1647    #[test]
1648    fn test_lsm_compaction_removes_tombstones() {
1649        let dir = TempDir::new().unwrap();
1650        let db = LsmEngine::create(dir.path()).unwrap();
1651
1652        // Insert items
1653        for i in 0..100 {
1654            let key = Key::new(format!("key{:03}", i).into_bytes());
1655            let mut item = HashMap::new();
1656            item.insert("value".to_string(), Value::number(i));
1657            db.put(key, item).unwrap();
1658        }
1659
1660        // Force flush
1661        db.flush().unwrap();
1662
1663        // Delete half the items
1664        for i in 0..50 {
1665            let key = Key::new(format!("key{:03}", i).into_bytes());
1666            db.delete(key).unwrap();
1667        }
1668
1669        // Force another flush
1670        db.flush().unwrap();
1671
1672        // Verify deletes worked
1673        for i in 0..50 {
1674            let key = Key::new(format!("key{:03}", i).into_bytes());
1675            assert!(db.get(&key).unwrap().is_none());
1676        }
1677
1678        // Verify remaining items still exist
1679        for i in 50..100 {
1680            let key = Key::new(format!("key{:03}", i).into_bytes());
1681            let result = db.get(&key).unwrap();
1682            assert!(result.is_some());
1683        }
1684    }
1685
1686    #[test]
1687    fn test_lsm_compaction_keeps_latest_version() {
1688        let dir = TempDir::new().unwrap();
1689        let db = LsmEngine::create(dir.path()).unwrap();
1690
1691        // Insert initial version
1692        let key = Key::new(b"test_key".to_vec());
1693        let mut item1 = HashMap::new();
1694        item1.insert("version".to_string(), Value::number(1));
1695        db.put(key.clone(), item1).unwrap();
1696
1697        // Force flush
1698        db.flush().unwrap();
1699
1700        // Update to version 2
1701        let mut item2 = HashMap::new();
1702        item2.insert("version".to_string(), Value::number(2));
1703        db.put(key.clone(), item2).unwrap();
1704
1705        // Force flush
1706        db.flush().unwrap();
1707
1708        // Update to version 3
1709        let mut item3 = HashMap::new();
1710        item3.insert("version".to_string(), Value::number(3));
1711        db.put(key.clone(), item3).unwrap();
1712
1713        // Force flush
1714        db.flush().unwrap();
1715
1716        // Read should return latest version
1717        let result = db.get(&key).unwrap().unwrap();
1718        assert_eq!(result.get("version").unwrap(), &Value::N("3".to_string()));
1719
1720        // Reopen database (forces recovery and any pending compaction)
1721        drop(db);
1722        let db = LsmEngine::open(dir.path()).unwrap();
1723
1724        // Verify latest version is still there
1725        let result = db.get(&key).unwrap().unwrap();
1726        assert_eq!(result.get("version").unwrap(), &Value::N("3".to_string()));
1727    }
1728
1729    #[test]
1730    fn test_compaction_configuration() {
1731        let dir = TempDir::new().unwrap();
1732        let db = LsmEngine::create(dir.path()).unwrap();
1733
1734        // Check default config
1735        let config = db.compaction_config();
1736        assert!(config.enabled);
1737        assert_eq!(config.sst_threshold, 10);
1738
1739        // Disable compaction
1740        db.set_compaction_config(CompactionConfig::disabled());
1741        let config = db.compaction_config();
1742        assert!(!config.enabled);
1743
1744        // Enable with custom settings
1745        let custom_config = CompactionConfig::new()
1746            .with_sst_threshold(5)
1747            .with_check_interval(30);
1748        db.set_compaction_config(custom_config);
1749
1750        let config = db.compaction_config();
1751        assert!(config.enabled);
1752        assert_eq!(config.sst_threshold, 5);
1753        assert_eq!(config.check_interval_secs, 30);
1754    }
1755
1756    #[test]
1757    fn test_compaction_statistics() {
1758        let dir = TempDir::new().unwrap();
1759        let db = LsmEngine::create(dir.path()).unwrap();
1760
1761        // Initial stats should be zero
1762        let stats = db.compaction_stats();
1763        assert_eq!(stats.total_compactions, 0);
1764        assert_eq!(stats.total_ssts_merged, 0);
1765        assert_eq!(stats.active_compactions, 0);
1766
1767        // Use same PK prefix to ensure all keys go to same stripe
1768        let pk = b"testdata";
1769
1770        // Write 12 batches of data - all keys use same PK to route to same stripe
1771        for batch in 0..12 {
1772            for i in 0..MEMTABLE_THRESHOLD {
1773                let key = Key::with_sk(pk.to_vec(), format!("batch{:02}_key{:04}", batch, i).into_bytes());
1774                let mut item = HashMap::new();
1775                item.insert("value".to_string(), Value::number(i as i64));
1776                db.put(key, item).unwrap();
1777            }
1778        }
1779
1780        // Check that compaction happened (12 SSTs in one stripe > 10 threshold)
1781        let stats = db.compaction_stats();
1782        assert!(stats.total_compactions > 0, "Expected at least one compaction");
1783        assert!(stats.total_ssts_merged > 0, "Expected SSTs to be merged");
1784        assert!(stats.total_ssts_created > 0, "Expected new SSTs to be created");
1785    }
1786
1787    #[test]
1788    fn test_manual_compaction_trigger() {
1789        let dir = TempDir::new().unwrap();
1790        let db = LsmEngine::create(dir.path()).unwrap();
1791
1792        // Set a low threshold for testing
1793        db.set_compaction_config(CompactionConfig::new().with_sst_threshold(3));
1794
1795        // Write enough data to create multiple SSTs in stripe 0
1796        // Use keys that hash to stripe 0
1797        let mut count = 0;
1798        for i in 0..50000 {
1799            let key = Key::new(format!("key{:06}", i).into_bytes());
1800            if key.stripe() == 0 {
1801                let mut item = HashMap::new();
1802                item.insert("value".to_string(), Value::number(i));
1803                db.put(key, item).unwrap();
1804                count += 1;
1805                if count >= MEMTABLE_THRESHOLD * 4 {
1806                    break;
1807                }
1808            }
1809        }
1810
1811        // Force flush
1812        db.flush().unwrap();
1813
1814        // Get initial stats
1815        let stats_before = db.compaction_stats();
1816
1817        // Manually trigger compaction on stripe 0
1818        db.trigger_compaction(0).unwrap();
1819
1820        // Stats should have changed
1821        let stats_after = db.compaction_stats();
1822        assert!(
1823            stats_after.total_compactions >= stats_before.total_compactions,
1824            "Compaction count should increase or stay the same"
1825        );
1826    }
1827
1828    #[test]
1829    fn test_compaction_disabled() {
1830        let dir = TempDir::new().unwrap();
1831        let db = LsmEngine::create(dir.path()).unwrap();
1832
1833        // Disable compaction
1834        db.set_compaction_config(CompactionConfig::disabled());
1835
1836        // Write lots of data
1837        for batch in 0..15 {
1838            for i in 0..MEMTABLE_THRESHOLD {
1839                let key = Key::new(format!("batch{:02}_key{:04}", batch, i).into_bytes());
1840                let mut item = HashMap::new();
1841                item.insert("value".to_string(), Value::number(i as i64));
1842                db.put(key, item).unwrap();
1843            }
1844        }
1845
1846        // Force flush
1847        db.flush().unwrap();
1848
1849        // Stats should show no compactions
1850        let stats = db.compaction_stats();
1851        assert_eq!(stats.total_compactions, 0, "No compactions should occur when disabled");
1852    }
1853
1854    #[test]
1855    fn test_compaction_with_deletes_reclaims_space() {
1856        let dir = TempDir::new().unwrap();
1857        let db = LsmEngine::create(dir.path()).unwrap();
1858
1859        // Insert many items
1860        for i in 0..200 {
1861            let key = Key::new(format!("key{:03}", i).into_bytes());
1862            let mut item = HashMap::new();
1863            item.insert("value".to_string(), Value::number(i));
1864            db.put(key, item).unwrap();
1865        }
1866
1867        // Force flush
1868        db.flush().unwrap();
1869
1870        // Delete half of them
1871        for i in 0..100 {
1872            let key = Key::new(format!("key{:03}", i).into_bytes());
1873            db.delete(key).unwrap();
1874        }
1875
1876        // Force flush to create tombstones in SST
1877        db.flush().unwrap();
1878
1879        // Trigger more writes to cause compaction
1880        for batch in 0..12 {
1881            for i in 200..(200 + MEMTABLE_THRESHOLD) {
1882                let key = Key::new(format!("key{:06}_{:02}", i, batch).into_bytes());
1883                let mut item = HashMap::new();
1884                item.insert("value".to_string(), Value::number(i as i64));
1885                db.put(key, item).unwrap();
1886            }
1887        }
1888
1889        // Verify deleted items are gone
1890        for i in 0..100 {
1891            let key = Key::new(format!("key{:03}", i).into_bytes());
1892            let result = db.get(&key).unwrap();
1893            assert!(result.is_none(), "Deleted key should not be found");
1894        }
1895
1896        // Verify non-deleted items still exist
1897        for i in 100..200 {
1898            let key = Key::new(format!("key{:03}", i).into_bytes());
1899            let result = db.get(&key).unwrap();
1900            assert!(result.is_some(), "Non-deleted key should still exist");
1901        }
1902    }
1903}