1use std::collections::BTreeMap;
13use std::path::{Path, PathBuf};
14use std::sync::Arc;
15use tokio::sync::RwLock;
16use tokio::io::AsyncWriteExt;
17use anyhow::Result;
18use kotoba_db_core::engine::StorageEngine;
19
20struct BloomFilter {
22 bits: Vec<u8>,
24 num_hashes: usize,
26 size: usize,
28}
29
30impl BloomFilter {
31 fn new(size: usize, num_hashes: usize) -> Self {
33 let byte_size = (size + 7) / 8; Self {
35 bits: vec![0; byte_size],
36 num_hashes,
37 size,
38 }
39 }
40
41 fn with_capacity(expected_items: usize, false_positive_rate: f64) -> Self {
43 let n = expected_items as f64;
44 let p = false_positive_rate;
45
46 let ln2_squared = (2.0_f64.ln()).powi(2);
48 let size = ((-n * p.ln()) / ln2_squared).ceil() as usize;
49
50 let num_hashes = ((size as f64 / n) * 2.0_f64.ln()).round() as usize;
52
53 Self::new(size.max(1024), num_hashes.max(1)) }
55
56 fn add(&mut self, item: &[u8]) {
58 for i in 0..self.num_hashes {
59 let hash = self.hash(item, i);
60 let bit_index = hash % self.size;
61 let byte_index = bit_index / 8;
62 let bit_offset = bit_index % 8;
63 self.bits[byte_index] |= 1 << bit_offset;
64 }
65 }
66
67 fn might_contain(&self, item: &[u8]) -> bool {
69 for i in 0..self.num_hashes {
70 let hash = self.hash(item, i);
71 let bit_index = hash % self.size;
72 let byte_index = bit_index / 8;
73 let bit_offset = bit_index % 8;
74 if (self.bits[byte_index] & (1 << bit_offset)) == 0 {
75 return false;
76 }
77 }
78 true
79 }
80
81 fn hash(&self, item: &[u8], seed: usize) -> usize {
83 use std::collections::hash_map::DefaultHasher;
84 use std::hash::{Hash, Hasher};
85
86 let mut hasher = DefaultHasher::new();
87 item.hash(&mut hasher);
88 seed.hash(&mut hasher);
89 hasher.finish() as usize
90 }
91
92 fn to_bytes(&self) -> Vec<u8> {
94 let mut bytes = Vec::new();
95 bytes.extend_from_slice(&(self.size as u32).to_le_bytes());
97 bytes.extend_from_slice(&(self.num_hashes as u32).to_le_bytes());
98 bytes.extend_from_slice(&self.bits);
100 bytes
101 }
102
103 fn from_bytes(data: &[u8]) -> Result<Self> {
105 if data.len() < 8 {
106 return Err(anyhow::anyhow!("Invalid Bloom filter data"));
107 }
108
109 let size = u32::from_le_bytes(data[0..4].try_into()?) as usize;
110 let num_hashes = u32::from_le_bytes(data[4..8].try_into()?) as usize;
111 let bits = data[8..].to_vec();
112
113 let expected_byte_size = (size + 7) / 8;
114 if bits.len() != expected_byte_size {
115 return Err(anyhow::anyhow!("Invalid Bloom filter bits length"));
116 }
117
118 Ok(Self { bits, num_hashes, size })
119 }
120}
121
122#[derive(Clone)]
124pub struct CompactionConfig {
125 pub max_sstables: usize,
127 pub min_compaction_files: usize,
129}
130
131impl Default for CompactionConfig {
132 fn default() -> Self {
133 Self {
134 max_sstables: 10,
135 min_compaction_files: 4,
136 }
137 }
138}
139
140pub struct LSMStorageEngine {
142 db_path: PathBuf,
144 memtable: Arc<RwLock<BTreeMap<Vec<u8>, Vec<u8>>>>,
146 wal: Arc<RwLock<WAL>>,
148 sstables: Arc<RwLock<Vec<SSTableHandle>>>,
150 compaction_config: CompactionConfig,
152}
153
154impl LSMStorageEngine {
155 pub async fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
157 Self::with_config(path, CompactionConfig::default()).await
158 }
159
160 pub async fn with_config<P: AsRef<Path>>(path: P, config: CompactionConfig) -> Result<Self> {
162 let db_path = path.as_ref().to_path_buf();
163
164 tokio::fs::create_dir_all(&db_path).await?;
166
167 let memtable = Arc::new(RwLock::new(BTreeMap::new()));
168 let wal = Arc::new(RwLock::new(WAL::new(db_path.join("wal"))?));
169
170 let sstables = Arc::new(RwLock::new(Self::load_sstables(&db_path).await?));
172
173 Ok(Self {
174 db_path,
175 memtable,
176 wal,
177 sstables,
178 compaction_config: config,
179 })
180 }
181
182 async fn load_sstables(db_path: &Path) -> Result<Vec<SSTableHandle>> {
184 let mut sstables = Vec::new();
185
186 let mut entries = tokio::fs::read_dir(db_path).await?;
188 let mut sstable_paths = Vec::new();
189
190 while let Some(entry) = entries.next_entry().await? {
191 let path = entry.path();
192 if let Some(filename) = path.file_name().and_then(|n| n.to_str()) {
193 if filename.starts_with("sstable_") && filename.ends_with(".dat") {
194 sstable_paths.push(path);
195 }
196 }
197 }
198
199 sstable_paths.sort_by(|a, b| {
201 let a_meta = std::fs::metadata(a).unwrap();
202 let b_meta = std::fs::metadata(b).unwrap();
203 b_meta.modified().unwrap().cmp(&a_meta.modified().unwrap())
204 });
205
206 for path in sstable_paths {
208 match SSTableHandle::load(&path).await {
209 Ok(handle) => sstables.push(handle),
210 Err(e) => eprintln!("Warning: Failed to load SSTable {:?}: {}", path, e),
211 }
212 }
213
214 Ok(sstables)
215 }
216}
217
218#[async_trait::async_trait]
219impl StorageEngine for LSMStorageEngine {
220 async fn put(&mut self, key: &[u8], value: &[u8]) -> Result<()> {
221 let mut wal = self.wal.write().await;
222
223 wal.append(key, value).await?;
225 drop(wal); let mut memtable = self.memtable.write().await;
229 memtable.insert(key.to_vec(), value.to_vec());
230
231 let needs_flush = memtable.len() > 1000; drop(memtable); if needs_flush {
236 self.flush_memtable().await?;
237 }
238
239 Ok(())
240 }
241
242 async fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
243 let memtable = self.memtable.read().await;
244
245 if let Some(value) = memtable.get(key) {
247 if value.is_empty() {
249 return Ok(None);
250 }
251 return Ok(Some(value.clone()));
252 }
253
254 let sstables = self.sstables.read().await;
256 for sstable in &*sstables {
257 if let Some(value) = sstable.search(key).await? {
258 return Ok(Some(value));
260 }
261 }
262
263 Ok(None)
265 }
266
267 async fn delete(&mut self, key: &[u8]) -> Result<()> {
268 let mut memtable = self.memtable.write().await;
269 let mut wal = self.wal.write().await;
270
271 wal.append(key, &[]).await?;
273
274 memtable.insert(key.to_vec(), Vec::new());
276
277 Ok(())
278 }
279
280 async fn scan(&self, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
281 let mut results = Vec::new();
282 let mut seen_keys = std::collections::HashSet::new();
283
284 let memtable = self.memtable.read().await;
286 for (key, value) in memtable.range(prefix.to_vec()..) {
287 if !key.starts_with(prefix) {
288 break;
289 }
290 if !value.is_empty() && seen_keys.insert(key.clone()) { results.push((key.clone(), value.clone()));
292 }
293 }
294 drop(memtable);
295
296 let sstables = self.sstables.read().await;
298 for sstable in &*sstables {
299 if prefix >= sstable.min_key.as_slice() && prefix <= sstable.max_key.as_slice() {
301 let sstable_data = tokio::fs::read(&sstable.path).await?;
302 let mut pos = 0;
303
304 while pos < sstable_data.len() {
305 let key_len = u32::from_le_bytes(sstable_data[pos..pos+4].try_into()?);
307 pos += 4;
308
309 let key = sstable_data[pos..pos + key_len as usize].to_vec();
311 pos += key_len as usize;
312
313 let value_len = u32::from_le_bytes(sstable_data[pos..pos+4].try_into()?);
315 pos += 4;
316
317 if key.starts_with(prefix) && seen_keys.insert(key.clone()) {
319 let value = sstable_data[pos..pos + value_len as usize].to_vec();
320 results.push((key, value));
321 } else {
322 pos += value_len as usize;
323 }
324 }
325 }
326 }
327
328 results.sort_by(|a, b| a.0.cmp(&b.0));
330
331 Ok(results)
332 }
333}
334
335impl LSMStorageEngine {
336 async fn flush_memtable(&mut self) -> Result<()> {
338 let mut memtable = self.memtable.write().await;
339 if memtable.is_empty() {
340 return Ok(());
341 }
342
343 let timestamp = std::time::SystemTime::now()
345 .duration_since(std::time::UNIX_EPOCH)?
346 .as_millis();
347 let sstable_path = self.db_path.join(format!("sstable_{}.dat", timestamp));
348
349 let mut bloom_filter = BloomFilter::with_capacity(memtable.len(), 0.01); for key in memtable.keys() {
352 bloom_filter.add(key);
353 }
354
355 let mut file = tokio::fs::File::create(&sstable_path).await?;
357
358 let bloom_bytes = bloom_filter.to_bytes();
360 let bloom_size = (bloom_bytes.len() as u32).to_le_bytes();
361 tokio::io::AsyncWriteExt::write_all(&mut file, &bloom_size).await?;
362 tokio::io::AsyncWriteExt::write_all(&mut file, &bloom_bytes).await?;
363
364 let mut data_size = 0u32;
366 for (key, value) in &*memtable {
367 data_size += 4 + key.len() as u32 + 4 + value.len() as u32;
368 }
369 tokio::io::AsyncWriteExt::write_all(&mut file, &data_size.to_le_bytes()).await?;
370
371 for (key, value) in &*memtable {
373 let key_len = (key.len() as u32).to_le_bytes();
375 let value_len = (value.len() as u32).to_le_bytes();
376
377 tokio::io::AsyncWriteExt::write_all(&mut file, &key_len).await?;
378 tokio::io::AsyncWriteExt::write_all(&mut file, key).await?;
379 tokio::io::AsyncWriteExt::write_all(&mut file, &value_len).await?;
380 tokio::io::AsyncWriteExt::write_all(&mut file, value).await?;
381 }
382 file.flush().await?;
383
384 let sstable_handle = SSTableHandle::load(&sstable_path).await?;
386 let mut sstables = self.sstables.write().await;
387 sstables.insert(0, sstable_handle); let needs_compaction = sstables.len() >= self.compaction_config.max_sstables;
391
392 memtable.clear();
394 let mut wal = self.wal.write().await;
395 wal.reset().await?;
396 drop(sstables); drop(memtable); drop(wal); if needs_compaction {
402 self.compact().await?;
403 }
404
405 Ok(())
406 }
407
408 async fn compact(&mut self) -> Result<()> {
410 let mut sstables = self.sstables.write().await;
411
412 if sstables.len() < self.compaction_config.min_compaction_files {
414 return Ok(());
415 }
416
417 let num_to_compact = std::cmp::min(self.compaction_config.min_compaction_files, sstables.len());
419 let start_idx = sstables.len() - num_to_compact;
420 let sstables_to_compact: Vec<_> = sstables.drain(start_idx..).collect();
421
422 drop(sstables);
424
425 self.perform_compaction(sstables_to_compact).await
427 }
428
429 async fn perform_compaction(&mut self, old_sstables: Vec<SSTableHandle>) -> Result<()> {
431 let mut merged_data = BTreeMap::new();
433
434 for sstable in &old_sstables {
435 let data = tokio::fs::read(&sstable.path).await?;
436 let mut pos = 0;
437
438 if data.len() >= 4 {
440 let bloom_size = u32::from_le_bytes(data[pos..pos+4].try_into()?) as usize;
441 pos += 4 + bloom_size;
442 if data.len() > pos + 4 {
443 let data_size = u32::from_le_bytes(data[pos..pos+4].try_into()?) as usize;
444 pos += 4;
445 let data_start = pos;
446
447 while pos < data_start + data_size {
448 let key_len = u32::from_le_bytes(data[pos..pos+4].try_into()?);
450 pos += 4;
451
452 let key = data[pos..pos + key_len as usize].to_vec();
454 pos += key_len as usize;
455
456 let value_len = u32::from_le_bytes(data[pos..pos+4].try_into()?);
458 pos += 4;
459
460 let value = data[pos..pos + value_len as usize].to_vec();
462 pos += value_len as usize;
463
464 if !value.is_empty() {
466 merged_data.insert(key, value);
467 } else {
468 merged_data.remove(&key);
469 }
470 }
471 }
472 }
473 }
474
475 let timestamp = std::time::SystemTime::now()
477 .duration_since(std::time::UNIX_EPOCH)?
478 .as_millis();
479 let compacted_path = self.db_path.join(format!("sstable_compacted_{}.dat", timestamp));
480
481 let mut bloom_filter = BloomFilter::with_capacity(merged_data.len(), 0.01);
483 for key in merged_data.keys() {
484 bloom_filter.add(key);
485 }
486
487 let mut file = tokio::fs::File::create(&compacted_path).await?;
488
489 let bloom_bytes = bloom_filter.to_bytes();
491 let bloom_size = (bloom_bytes.len() as u32).to_le_bytes();
492 tokio::io::AsyncWriteExt::write_all(&mut file, &bloom_size).await?;
493 tokio::io::AsyncWriteExt::write_all(&mut file, &bloom_bytes).await?;
494
495 let mut data_size = 0u32;
497 for (key, value) in &merged_data {
498 data_size += 4 + key.len() as u32 + 4 + value.len() as u32;
499 }
500 tokio::io::AsyncWriteExt::write_all(&mut file, &data_size.to_le_bytes()).await?;
501
502 for (key, value) in &merged_data {
504 let key_len = (key.len() as u32).to_le_bytes();
505 let value_len = (value.len() as u32).to_le_bytes();
506
507 tokio::io::AsyncWriteExt::write_all(&mut file, &key_len).await?;
508 tokio::io::AsyncWriteExt::write_all(&mut file, key).await?;
509 tokio::io::AsyncWriteExt::write_all(&mut file, &value_len).await?;
510 tokio::io::AsyncWriteExt::write_all(&mut file, value).await?;
511 }
512 file.flush().await?;
513
514 let compacted_handle = SSTableHandle::load(&compacted_path).await?;
516
517 let mut sstables = self.sstables.write().await;
519 sstables.retain(|sstable| {
521 !old_sstables.iter().any(|old| old.path == sstable.path)
522 });
523 sstables.push(compacted_handle);
525
526 for old_sstable in old_sstables {
528 if let Err(e) = tokio::fs::remove_file(&old_sstable.path).await {
529 eprintln!("Warning: Failed to remove old SSTable {:?}: {}", old_sstable.path, e);
530 }
531 }
532
533 Ok(())
534 }
535}
536
537struct SSTableHandle {
539 path: PathBuf,
540 min_key: Vec<u8>,
541 max_key: Vec<u8>,
542 bloom_filter: BloomFilter,
543}
544
545impl SSTableHandle {
546 async fn load<P: AsRef<Path>>(path: P) -> Result<Self> {
547 let path = path.as_ref().to_path_buf();
548 let data = tokio::fs::read(&path).await?;
549
550 if data.len() < 4 {
551 return Err(anyhow::anyhow!("SSTable file too small"));
552 }
553
554 let mut pos = 0;
555
556 let bloom_size = u32::from_le_bytes(data[pos..pos+4].try_into()?) as usize;
558 pos += 4;
559
560 let bloom_data = &data[pos..pos + bloom_size];
562 let bloom_filter = BloomFilter::from_bytes(bloom_data)?;
563 pos += bloom_size;
564
565 let data_size = u32::from_le_bytes(data[pos..pos+4].try_into()?) as usize;
567 pos += 4;
568
569 let mut min_key = Vec::new();
571 let mut max_key = Vec::new();
572 let data_start = pos;
573
574 while pos < data_start + data_size {
575 let key_len = u32::from_le_bytes(data[pos..pos+4].try_into()?);
577 pos += 4;
578
579 let key = data[pos..pos + key_len as usize].to_vec();
581 pos += key_len as usize;
582
583 let value_len = u32::from_le_bytes(data[pos..pos+4].try_into()?);
585 pos += 4;
586
587 pos += value_len as usize;
589
590 if min_key.is_empty() || key < min_key {
592 min_key = key.clone();
593 }
594 if max_key.is_empty() || key > max_key {
595 max_key = key.clone();
596 }
597 }
598
599 Ok(SSTableHandle { path, min_key, max_key, bloom_filter })
600 }
601
602 async fn search(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
603 if key < self.min_key.as_slice() || key > self.max_key.as_slice() {
605 return Ok(None);
606 }
607
608 if !self.bloom_filter.might_contain(key) {
610 return Ok(None);
611 }
612
613 let data = tokio::fs::read(&self.path).await?;
614 let mut pos = 0;
615
616 let bloom_size = u32::from_le_bytes(data[pos..pos+4].try_into()?) as usize;
618 pos += 4 + bloom_size;
619 let data_size = u32::from_le_bytes(data[pos..pos+4].try_into()?) as usize;
620 pos += 4;
621
622 let data_start = pos;
623
624 while pos < data_start + data_size {
625 let key_len = u32::from_le_bytes(data[pos..pos+4].try_into()?);
627 pos += 4;
628
629 let current_key = &data[pos..pos + key_len as usize];
631 pos += key_len as usize;
632
633 let value_len = u32::from_le_bytes(data[pos..pos+4].try_into()?);
635 pos += 4;
636
637 match current_key.cmp(key) {
639 std::cmp::Ordering::Equal => {
640 let value = data[pos..pos + value_len as usize].to_vec();
642 return Ok(Some(value));
643 }
644 std::cmp::Ordering::Greater => {
645 return Ok(None);
647 }
648 std::cmp::Ordering::Less => {
649 pos += value_len as usize;
651 }
652 }
653 }
654
655 Ok(None)
656 }
657}
658
659struct WAL {
661 path: PathBuf,
662 file: Option<tokio::fs::File>,
663 sequence: u64,
664}
665
666impl WAL {
667 fn new(path: PathBuf) -> Result<Self> {
668 Ok(Self {
669 path,
670 file: None,
671 sequence: 0,
672 })
673 }
674
675 async fn append(&mut self, key: &[u8], value: &[u8]) -> Result<()> {
676 if self.file.is_none() {
677 self.file = Some(tokio::fs::File::create(&self.path).await?);
679 }
680
681 if let Some(file) = &mut self.file {
682 let seq_bytes = self.sequence.to_le_bytes();
684 let key_len = (key.len() as u32).to_le_bytes();
685 let value_len = (value.len() as u32).to_le_bytes();
686
687 tokio::io::AsyncWriteExt::write_all(file, &seq_bytes).await?;
688 tokio::io::AsyncWriteExt::write_all(file, &key_len).await?;
689 tokio::io::AsyncWriteExt::write_all(file, key).await?;
690 tokio::io::AsyncWriteExt::write_all(file, &value_len).await?;
691 tokio::io::AsyncWriteExt::write_all(file, value).await?;
692
693 file.flush().await?;
695 }
696
697 self.sequence += 1;
698 Ok(())
699 }
700
701 async fn reset(&mut self) -> Result<()> {
702 if let Some(file) = &mut self.file {
703 file.flush().await?;
704 }
706 Ok(())
707 }
708}