kotoba_db_engine_lsm/
lib.rs

1//! LSM-Tree based storage engine for KotobaDB.
2//!
3//! This engine implements a Log-Structured Merge-Tree (LSM-Tree) architecture,
4//! providing high-performance storage with efficient writes and optimized reads.
5//!
6//! Features:
7//! - Write-Ahead Log (WAL) for durability
8//! - MemTable for in-memory buffering
9//! - SSTable files for persistent storage
10//! - Background compaction for performance optimization
11
12use std::collections::BTreeMap;
13use std::path::{Path, PathBuf};
14use std::sync::Arc;
15use tokio::sync::RwLock;
16use tokio::io::AsyncWriteExt;
17use anyhow::Result;
18use kotoba_db_core::engine::StorageEngine;
19
20/// A space-efficient probabilistic data structure for testing set membership
21struct BloomFilter {
22    /// The bit array
23    bits: Vec<u8>,
24    /// Number of hash functions
25    num_hashes: usize,
26    /// Size of the bit array in bits
27    size: usize,
28}
29
30impl BloomFilter {
31    /// Create a new Bloom filter with the given parameters
32    fn new(size: usize, num_hashes: usize) -> Self {
33        let byte_size = (size + 7) / 8; // Round up to bytes
34        Self {
35            bits: vec![0; byte_size],
36            num_hashes,
37            size,
38        }
39    }
40
41    /// Create a Bloom filter sized for the given number of expected items
42    fn with_capacity(expected_items: usize, false_positive_rate: f64) -> Self {
43        let n = expected_items as f64;
44        let p = false_positive_rate;
45
46        // Calculate optimal size: m = -n * ln(p) / (ln(2))^2
47        let ln2_squared = (2.0_f64.ln()).powi(2);
48        let size = ((-n * p.ln()) / ln2_squared).ceil() as usize;
49
50        // Calculate optimal number of hash functions: k = m/n * ln(2)
51        let num_hashes = ((size as f64 / n) * 2.0_f64.ln()).round() as usize;
52
53        Self::new(size.max(1024), num_hashes.max(1)) // Minimum size of 1024 bits
54    }
55
56    /// Add an item to the Bloom filter
57    fn add(&mut self, item: &[u8]) {
58        for i in 0..self.num_hashes {
59            let hash = self.hash(item, i);
60            let bit_index = hash % self.size;
61            let byte_index = bit_index / 8;
62            let bit_offset = bit_index % 8;
63            self.bits[byte_index] |= 1 << bit_offset;
64        }
65    }
66
67    /// Check if an item might be in the set (false positives possible, no false negatives)
68    fn might_contain(&self, item: &[u8]) -> bool {
69        for i in 0..self.num_hashes {
70            let hash = self.hash(item, i);
71            let bit_index = hash % self.size;
72            let byte_index = bit_index / 8;
73            let bit_offset = bit_index % 8;
74            if (self.bits[byte_index] & (1 << bit_offset)) == 0 {
75                return false;
76            }
77        }
78        true
79    }
80
81    /// Double hash function for generating multiple hashes
82    fn hash(&self, item: &[u8], seed: usize) -> usize {
83        use std::collections::hash_map::DefaultHasher;
84        use std::hash::{Hash, Hasher};
85
86        let mut hasher = DefaultHasher::new();
87        item.hash(&mut hasher);
88        seed.hash(&mut hasher);
89        hasher.finish() as usize
90    }
91
92    /// Serialize the Bloom filter to bytes
93    fn to_bytes(&self) -> Vec<u8> {
94        let mut bytes = Vec::new();
95        // Write size and num_hashes
96        bytes.extend_from_slice(&(self.size as u32).to_le_bytes());
97        bytes.extend_from_slice(&(self.num_hashes as u32).to_le_bytes());
98        // Write bits
99        bytes.extend_from_slice(&self.bits);
100        bytes
101    }
102
103    /// Deserialize a Bloom filter from bytes
104    fn from_bytes(data: &[u8]) -> Result<Self> {
105        if data.len() < 8 {
106            return Err(anyhow::anyhow!("Invalid Bloom filter data"));
107        }
108
109        let size = u32::from_le_bytes(data[0..4].try_into()?) as usize;
110        let num_hashes = u32::from_le_bytes(data[4..8].try_into()?) as usize;
111        let bits = data[8..].to_vec();
112
113        let expected_byte_size = (size + 7) / 8;
114        if bits.len() != expected_byte_size {
115            return Err(anyhow::anyhow!("Invalid Bloom filter bits length"));
116        }
117
118        Ok(Self { bits, num_hashes, size })
119    }
120}
121
122/// Configuration for compaction behavior
123#[derive(Clone)]
124pub struct CompactionConfig {
125    /// Maximum number of SSTable files before triggering compaction
126    pub max_sstables: usize,
127    /// Minimum number of SSTable files to compact together
128    pub min_compaction_files: usize,
129}
130
131impl Default for CompactionConfig {
132    fn default() -> Self {
133        Self {
134            max_sstables: 10,
135            min_compaction_files: 4,
136        }
137    }
138}
139
140/// LSM-Tree based storage engine implementation.
141pub struct LSMStorageEngine {
142    /// Path to the database directory
143    db_path: PathBuf,
144    /// In-memory buffer (MemTable)
145    memtable: Arc<RwLock<BTreeMap<Vec<u8>, Vec<u8>>>>,
146    /// Write-Ahead Log for durability
147    wal: Arc<RwLock<WAL>>,
148    /// List of SSTable files (most recent first)
149    sstables: Arc<RwLock<Vec<SSTableHandle>>>,
150    /// Compaction configuration
151    compaction_config: CompactionConfig,
152}
153
154impl LSMStorageEngine {
155    /// Creates a new LSM storage engine at the specified path with default compaction config.
156    pub async fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
157        Self::with_config(path, CompactionConfig::default()).await
158    }
159
160    /// Creates a new LSM storage engine at the specified path with custom compaction config.
161    pub async fn with_config<P: AsRef<Path>>(path: P, config: CompactionConfig) -> Result<Self> {
162        let db_path = path.as_ref().to_path_buf();
163
164        // Create database directory if it doesn't exist
165        tokio::fs::create_dir_all(&db_path).await?;
166
167        let memtable = Arc::new(RwLock::new(BTreeMap::new()));
168        let wal = Arc::new(RwLock::new(WAL::new(db_path.join("wal"))?));
169
170        // Load existing SSTable files
171        let sstables = Arc::new(RwLock::new(Self::load_sstables(&db_path).await?));
172
173        Ok(Self {
174            db_path,
175            memtable,
176            wal,
177            sstables,
178            compaction_config: config,
179        })
180    }
181
182    /// Load existing SSTable files from disk, sorted by creation time (newest first)
183    async fn load_sstables(db_path: &Path) -> Result<Vec<SSTableHandle>> {
184        let mut sstables = Vec::new();
185
186        // Read directory and find SSTable files
187        let mut entries = tokio::fs::read_dir(db_path).await?;
188        let mut sstable_paths = Vec::new();
189
190        while let Some(entry) = entries.next_entry().await? {
191            let path = entry.path();
192            if let Some(filename) = path.file_name().and_then(|n| n.to_str()) {
193                if filename.starts_with("sstable_") && filename.ends_with(".dat") {
194                    sstable_paths.push(path);
195                }
196            }
197        }
198
199        // Sort by modification time (newest first)
200        sstable_paths.sort_by(|a, b| {
201            let a_meta = std::fs::metadata(a).unwrap();
202            let b_meta = std::fs::metadata(b).unwrap();
203            b_meta.modified().unwrap().cmp(&a_meta.modified().unwrap())
204        });
205
206        // Load SSTable handles
207        for path in sstable_paths {
208            match SSTableHandle::load(&path).await {
209                Ok(handle) => sstables.push(handle),
210                Err(e) => eprintln!("Warning: Failed to load SSTable {:?}: {}", path, e),
211            }
212        }
213
214        Ok(sstables)
215    }
216}
217
218#[async_trait::async_trait]
219impl StorageEngine for LSMStorageEngine {
220    async fn put(&mut self, key: &[u8], value: &[u8]) -> Result<()> {
221        let mut wal = self.wal.write().await;
222
223        // Write to WAL first for durability
224        wal.append(key, value).await?;
225        drop(wal); // Release WAL lock
226
227        // Then update memtable
228        let mut memtable = self.memtable.write().await;
229        memtable.insert(key.to_vec(), value.to_vec());
230
231        // Check if memtable needs to be flushed
232        let needs_flush = memtable.len() > 1000; // Simple threshold for now
233        drop(memtable); // Release the lock
234
235        if needs_flush {
236            self.flush_memtable().await?;
237        }
238
239        Ok(())
240    }
241
242    async fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
243        let memtable = self.memtable.read().await;
244
245        // First check memtable (most recent data)
246        if let Some(value) = memtable.get(key) {
247            // Check if it's a tombstone (empty value)
248            if value.is_empty() {
249                return Ok(None);
250            }
251            return Ok(Some(value.clone()));
252        }
253
254        // Then check SSTable files (newest to oldest)
255        let sstables = self.sstables.read().await;
256        for sstable in &*sstables {
257            if let Some(value) = sstable.search(key).await? {
258                // Found in SSTable
259                return Ok(Some(value));
260            }
261        }
262
263        // Not found anywhere
264        Ok(None)
265    }
266
267    async fn delete(&mut self, key: &[u8]) -> Result<()> {
268        let mut memtable = self.memtable.write().await;
269        let mut wal = self.wal.write().await;
270
271        // Write tombstone to WAL
272        wal.append(key, &[]).await?;
273
274        // Mark as deleted in memtable (empty value = tombstone)
275        memtable.insert(key.to_vec(), Vec::new());
276
277        Ok(())
278    }
279
280    async fn scan(&self, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
281        let mut results = Vec::new();
282        let mut seen_keys = std::collections::HashSet::new();
283
284        // First scan memtable (most recent data)
285        let memtable = self.memtable.read().await;
286        for (key, value) in memtable.range(prefix.to_vec()..) {
287            if !key.starts_with(prefix) {
288                break;
289            }
290            if !value.is_empty() && seen_keys.insert(key.clone()) { // Skip tombstones and duplicates
291                results.push((key.clone(), value.clone()));
292            }
293        }
294        drop(memtable);
295
296        // Then scan SSTable files (newest to oldest)
297        let sstables = self.sstables.read().await;
298        for sstable in &*sstables {
299            // Only scan SSTable if its key range overlaps with our prefix
300            if prefix >= sstable.min_key.as_slice() && prefix <= sstable.max_key.as_slice() {
301                let sstable_data = tokio::fs::read(&sstable.path).await?;
302                let mut pos = 0;
303
304                while pos < sstable_data.len() {
305                    // Read key_len (4 bytes, little endian)
306                    let key_len = u32::from_le_bytes(sstable_data[pos..pos+4].try_into()?);
307                    pos += 4;
308
309                    // Read key
310                    let key = sstable_data[pos..pos + key_len as usize].to_vec();
311                    pos += key_len as usize;
312
313                    // Read value_len (4 bytes, little endian)
314                    let value_len = u32::from_le_bytes(sstable_data[pos..pos+4].try_into()?);
315                    pos += 4;
316
317                    // Check if key matches prefix and hasn't been seen in newer data
318                    if key.starts_with(prefix) && seen_keys.insert(key.clone()) {
319                        let value = sstable_data[pos..pos + value_len as usize].to_vec();
320                        results.push((key, value));
321                    } else {
322                        pos += value_len as usize;
323                    }
324                }
325            }
326        }
327
328        // Sort results by key for consistent ordering
329        results.sort_by(|a, b| a.0.cmp(&b.0));
330
331        Ok(results)
332    }
333}
334
335impl LSMStorageEngine {
336    /// Flushes the current memtable to disk as an SSTable file.
337    async fn flush_memtable(&mut self) -> Result<()> {
338        let mut memtable = self.memtable.write().await;
339        if memtable.is_empty() {
340            return Ok(());
341        }
342
343        // Generate SSTable file name
344        let timestamp = std::time::SystemTime::now()
345            .duration_since(std::time::UNIX_EPOCH)?
346            .as_millis();
347        let sstable_path = self.db_path.join(format!("sstable_{}.dat", timestamp));
348
349        // Create Bloom filter for all keys in memtable
350        let mut bloom_filter = BloomFilter::with_capacity(memtable.len(), 0.01); // 1% false positive rate
351        for key in memtable.keys() {
352            bloom_filter.add(key);
353        }
354
355        // Write SSTable file with Bloom filter header
356        let mut file = tokio::fs::File::create(&sstable_path).await?;
357
358        // Write Bloom filter
359        let bloom_bytes = bloom_filter.to_bytes();
360        let bloom_size = (bloom_bytes.len() as u32).to_le_bytes();
361        tokio::io::AsyncWriteExt::write_all(&mut file, &bloom_size).await?;
362        tokio::io::AsyncWriteExt::write_all(&mut file, &bloom_bytes).await?;
363
364        // Calculate and write data size
365        let mut data_size = 0u32;
366        for (key, value) in &*memtable {
367            data_size += 4 + key.len() as u32 + 4 + value.len() as u32;
368        }
369        tokio::io::AsyncWriteExt::write_all(&mut file, &data_size.to_le_bytes()).await?;
370
371        // Write memtable contents
372        for (key, value) in &*memtable {
373            // Binary format: [key_len(4bytes)][key][value_len(4bytes)][value]
374            let key_len = (key.len() as u32).to_le_bytes();
375            let value_len = (value.len() as u32).to_le_bytes();
376
377            tokio::io::AsyncWriteExt::write_all(&mut file, &key_len).await?;
378            tokio::io::AsyncWriteExt::write_all(&mut file, key).await?;
379            tokio::io::AsyncWriteExt::write_all(&mut file, &value_len).await?;
380            tokio::io::AsyncWriteExt::write_all(&mut file, value).await?;
381        }
382        file.flush().await?;
383
384        // Create SSTable handle and add to the list (newest first)
385        let sstable_handle = SSTableHandle::load(&sstable_path).await?;
386        let mut sstables = self.sstables.write().await;
387        sstables.insert(0, sstable_handle); // Insert at beginning (newest)
388
389        // Check if compaction is needed
390        let needs_compaction = sstables.len() >= self.compaction_config.max_sstables;
391
392        // Clear memtable and reset WAL
393        memtable.clear();
394        let mut wal = self.wal.write().await;
395        wal.reset().await?;
396        drop(sstables); // Release lock before compaction
397        drop(memtable); // Release memtable lock
398        drop(wal); // Release WAL lock
399
400        // Trigger compaction if needed
401        if needs_compaction {
402            self.compact().await?;
403        }
404
405        Ok(())
406    }
407
408    /// Compact SSTable files to improve read performance and reclaim space
409    async fn compact(&mut self) -> Result<()> {
410        let mut sstables = self.sstables.write().await;
411
412        // Need at least min_compaction_files SSTables to compact
413        if sstables.len() < self.compaction_config.min_compaction_files {
414            return Ok(());
415        }
416
417        // Select SSTables to compact (oldest ones)
418        let num_to_compact = std::cmp::min(self.compaction_config.min_compaction_files, sstables.len());
419        let start_idx = sstables.len() - num_to_compact;
420        let sstables_to_compact: Vec<_> = sstables.drain(start_idx..).collect();
421
422        // Release the lock temporarily
423        drop(sstables);
424
425        // Perform the compaction
426        self.perform_compaction(sstables_to_compact).await
427    }
428
429    /// Perform the actual compaction by merging SSTables
430    async fn perform_compaction(&mut self, old_sstables: Vec<SSTableHandle>) -> Result<()> {
431        // Collect all key-value pairs from SSTables to compact
432        let mut merged_data = BTreeMap::new();
433
434        for sstable in &old_sstables {
435            let data = tokio::fs::read(&sstable.path).await?;
436            let mut pos = 0;
437
438            // Skip Bloom filter header (for old format compatibility)
439            if data.len() >= 4 {
440                let bloom_size = u32::from_le_bytes(data[pos..pos+4].try_into()?) as usize;
441                pos += 4 + bloom_size;
442                if data.len() > pos + 4 {
443                    let data_size = u32::from_le_bytes(data[pos..pos+4].try_into()?) as usize;
444                    pos += 4;
445                    let data_start = pos;
446
447                    while pos < data_start + data_size {
448                        // Read key_len (4 bytes, little endian)
449                        let key_len = u32::from_le_bytes(data[pos..pos+4].try_into()?);
450                        pos += 4;
451
452                        // Read key
453                        let key = data[pos..pos + key_len as usize].to_vec();
454                        pos += key_len as usize;
455
456                        // Read value_len (4 bytes, little endian)
457                        let value_len = u32::from_le_bytes(data[pos..pos+4].try_into()?);
458                        pos += 4;
459
460                        // Read value
461                        let value = data[pos..pos + value_len as usize].to_vec();
462                        pos += value_len as usize;
463
464                        // Only keep non-tombstone entries and overwrite older values
465                        if !value.is_empty() {
466                            merged_data.insert(key, value);
467                        } else {
468                            merged_data.remove(&key);
469                        }
470                    }
471                }
472            }
473        }
474
475        // Create new compacted SSTable with Bloom filter
476        let timestamp = std::time::SystemTime::now()
477            .duration_since(std::time::UNIX_EPOCH)?
478            .as_millis();
479        let compacted_path = self.db_path.join(format!("sstable_compacted_{}.dat", timestamp));
480
481        // Create Bloom filter for merged data
482        let mut bloom_filter = BloomFilter::with_capacity(merged_data.len(), 0.01);
483        for key in merged_data.keys() {
484            bloom_filter.add(key);
485        }
486
487        let mut file = tokio::fs::File::create(&compacted_path).await?;
488
489        // Write Bloom filter
490        let bloom_bytes = bloom_filter.to_bytes();
491        let bloom_size = (bloom_bytes.len() as u32).to_le_bytes();
492        tokio::io::AsyncWriteExt::write_all(&mut file, &bloom_size).await?;
493        tokio::io::AsyncWriteExt::write_all(&mut file, &bloom_bytes).await?;
494
495        // Calculate and write data size
496        let mut data_size = 0u32;
497        for (key, value) in &merged_data {
498            data_size += 4 + key.len() as u32 + 4 + value.len() as u32;
499        }
500        tokio::io::AsyncWriteExt::write_all(&mut file, &data_size.to_le_bytes()).await?;
501
502        // Write merged data
503        for (key, value) in &merged_data {
504            let key_len = (key.len() as u32).to_le_bytes();
505            let value_len = (value.len() as u32).to_le_bytes();
506
507            tokio::io::AsyncWriteExt::write_all(&mut file, &key_len).await?;
508            tokio::io::AsyncWriteExt::write_all(&mut file, key).await?;
509            tokio::io::AsyncWriteExt::write_all(&mut file, &value_len).await?;
510            tokio::io::AsyncWriteExt::write_all(&mut file, value).await?;
511        }
512        file.flush().await?;
513
514        // Load the new compacted SSTable
515        let compacted_handle = SSTableHandle::load(&compacted_path).await?;
516
517        // Update SSTable list: remove old ones and add new compacted one
518        let mut sstables = self.sstables.write().await;
519        // Remove the old SSTables from the list
520        sstables.retain(|sstable| {
521            !old_sstables.iter().any(|old| old.path == sstable.path)
522        });
523        // Add the new compacted SSTable
524        sstables.push(compacted_handle);
525
526        // Delete old SSTable files
527        for old_sstable in old_sstables {
528            if let Err(e) = tokio::fs::remove_file(&old_sstable.path).await {
529                eprintln!("Warning: Failed to remove old SSTable {:?}: {}", old_sstable.path, e);
530            }
531        }
532
533        Ok(())
534    }
535}
536
537/// Handle for an SSTable file
538struct SSTableHandle {
539    path: PathBuf,
540    min_key: Vec<u8>,
541    max_key: Vec<u8>,
542    bloom_filter: BloomFilter,
543}
544
545impl SSTableHandle {
546    async fn load<P: AsRef<Path>>(path: P) -> Result<Self> {
547        let path = path.as_ref().to_path_buf();
548        let data = tokio::fs::read(&path).await?;
549
550        if data.len() < 4 {
551            return Err(anyhow::anyhow!("SSTable file too small"));
552        }
553
554        let mut pos = 0;
555
556        // Read Bloom filter size
557        let bloom_size = u32::from_le_bytes(data[pos..pos+4].try_into()?) as usize;
558        pos += 4;
559
560        // Read Bloom filter
561        let bloom_data = &data[pos..pos + bloom_size];
562        let bloom_filter = BloomFilter::from_bytes(bloom_data)?;
563        pos += bloom_size;
564
565        // Read data size (for validation)
566        let data_size = u32::from_le_bytes(data[pos..pos+4].try_into()?) as usize;
567        pos += 4;
568
569        // Extract min/max keys from data
570        let mut min_key = Vec::new();
571        let mut max_key = Vec::new();
572        let data_start = pos;
573
574        while pos < data_start + data_size {
575            // Read key_len (4 bytes, little endian)
576            let key_len = u32::from_le_bytes(data[pos..pos+4].try_into()?);
577            pos += 4;
578
579            // Read key
580            let key = data[pos..pos + key_len as usize].to_vec();
581            pos += key_len as usize;
582
583            // Read value_len (4 bytes, little endian)
584            let value_len = u32::from_le_bytes(data[pos..pos+4].try_into()?);
585            pos += 4;
586
587            // Skip value
588            pos += value_len as usize;
589
590            // Update min/max keys
591            if min_key.is_empty() || key < min_key {
592                min_key = key.clone();
593            }
594            if max_key.is_empty() || key > max_key {
595                max_key = key.clone();
596            }
597        }
598
599        Ok(SSTableHandle { path, min_key, max_key, bloom_filter })
600    }
601
602    async fn search(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
603        // If key is outside this SSTable's range, return None immediately
604        if key < self.min_key.as_slice() || key > self.max_key.as_slice() {
605            return Ok(None);
606        }
607
608        // Check Bloom filter first (fast negative check)
609        if !self.bloom_filter.might_contain(key) {
610            return Ok(None);
611        }
612
613        let data = tokio::fs::read(&self.path).await?;
614        let mut pos = 0;
615
616        // Skip Bloom filter and data size headers
617        let bloom_size = u32::from_le_bytes(data[pos..pos+4].try_into()?) as usize;
618        pos += 4 + bloom_size;
619        let data_size = u32::from_le_bytes(data[pos..pos+4].try_into()?) as usize;
620        pos += 4;
621
622        let data_start = pos;
623
624        while pos < data_start + data_size {
625            // Read key_len (4 bytes, little endian)
626            let key_len = u32::from_le_bytes(data[pos..pos+4].try_into()?);
627            pos += 4;
628
629            // Read key
630            let current_key = &data[pos..pos + key_len as usize];
631            pos += key_len as usize;
632
633            // Read value_len (4 bytes, little endian)
634            let value_len = u32::from_le_bytes(data[pos..pos+4].try_into()?);
635            pos += 4;
636
637            // Compare keys
638            match current_key.cmp(key) {
639                std::cmp::Ordering::Equal => {
640                    // Found the key, return the value
641                    let value = data[pos..pos + value_len as usize].to_vec();
642                    return Ok(Some(value));
643                }
644                std::cmp::Ordering::Greater => {
645                    // Key not found (passed the insertion point)
646                    return Ok(None);
647                }
648                std::cmp::Ordering::Less => {
649                    // Continue searching
650                    pos += value_len as usize;
651                }
652            }
653        }
654
655        Ok(None)
656    }
657}
658
659/// Write-Ahead Log for durability
660struct WAL {
661    path: PathBuf,
662    file: Option<tokio::fs::File>,
663    sequence: u64,
664}
665
666impl WAL {
667    fn new(path: PathBuf) -> Result<Self> {
668        Ok(Self {
669            path,
670            file: None,
671            sequence: 0,
672        })
673    }
674
675    async fn append(&mut self, key: &[u8], value: &[u8]) -> Result<()> {
676        if self.file.is_none() {
677            // Open or create WAL file
678            self.file = Some(tokio::fs::File::create(&self.path).await?);
679        }
680
681        if let Some(file) = &mut self.file {
682            // Write entry: [sequence(8)][key_len(4)][key][value_len(4)][value]
683            let seq_bytes = self.sequence.to_le_bytes();
684            let key_len = (key.len() as u32).to_le_bytes();
685            let value_len = (value.len() as u32).to_le_bytes();
686
687            tokio::io::AsyncWriteExt::write_all(file, &seq_bytes).await?;
688            tokio::io::AsyncWriteExt::write_all(file, &key_len).await?;
689            tokio::io::AsyncWriteExt::write_all(file, key).await?;
690            tokio::io::AsyncWriteExt::write_all(file, &value_len).await?;
691            tokio::io::AsyncWriteExt::write_all(file, value).await?;
692
693            // Flush to ensure durability
694            file.flush().await?;
695        }
696
697        self.sequence += 1;
698        Ok(())
699    }
700
701    async fn reset(&mut self) -> Result<()> {
702        if let Some(file) = &mut self.file {
703            file.flush().await?;
704            // In a real implementation, we'd rotate the WAL file here
705        }
706        Ok(())
707    }
708}