1use rustlite_core::{Error, Result};
38use rustlite_wal::{RecordPayload, SyncMode, WalConfig, WalManager, WalRecord};
39use std::path::{Path, PathBuf};
40use std::sync::{Arc, Mutex, RwLock};
41
42pub mod compaction;
43pub mod manifest;
44pub mod memtable;
45pub mod sstable;
46
47pub use compaction::{CompactionConfig, CompactionStats, CompactionWorker};
48pub use manifest::{Manifest, ManifestSSTable};
49pub use memtable::{Memtable, MemtableEntry};
50pub use sstable::{SSTableEntry, SSTableMeta, SSTableReader, SSTableWriter};
51
52const DEFAULT_MEMTABLE_SIZE: u64 = 4 * 1024 * 1024;
54
55#[derive(Debug, Clone)]
57pub struct StorageConfig {
58 pub memtable_size: u64,
60 pub sync_mode: SyncMode,
62 pub compaction: CompactionConfig,
64 pub enable_compaction: bool,
66}
67
68impl Default for StorageConfig {
69 fn default() -> Self {
70 Self {
71 memtable_size: DEFAULT_MEMTABLE_SIZE,
72 sync_mode: SyncMode::Sync,
73 compaction: CompactionConfig::default(),
74 enable_compaction: true,
75 }
76 }
77}
78
79pub struct StorageEngine {
83 dir: PathBuf,
85 config: StorageConfig,
87 memtable: Arc<RwLock<Memtable>>,
89 immutable_memtables: Arc<Mutex<Vec<Arc<Memtable>>>>,
91 wal: Arc<Mutex<WalManager>>,
93 manifest: Arc<Mutex<Manifest>>,
95 compactor: Arc<Mutex<CompactionWorker>>,
97 sequence: Arc<RwLock<u64>>,
99}
100
101impl StorageEngine {
102 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
104 Self::open_with_config(path, StorageConfig::default())
105 }
106
107 pub fn open_with_config(path: impl AsRef<Path>, config: StorageConfig) -> Result<Self> {
109 let dir = path.as_ref().to_path_buf();
110 std::fs::create_dir_all(&dir)?;
111
112 std::fs::create_dir_all(dir.join("wal"))?;
114 std::fs::create_dir_all(dir.join("sst"))?;
115
116 let wal_config = WalConfig {
118 wal_dir: dir.join("wal"),
119 sync_mode: config.sync_mode,
120 ..Default::default()
121 };
122 let mut wal = WalManager::new(wal_config)?;
123 wal.open()?;
124
125 let manifest = Manifest::open(&dir)?;
127 let sequence = manifest.sequence();
128
129 let compactor = CompactionWorker::new(&dir, config.compaction.clone());
131
132 let memtable = Memtable::with_sequence(sequence);
134
135 let engine = Self {
136 dir,
137 config,
138 memtable: Arc::new(RwLock::new(memtable)),
139 immutable_memtables: Arc::new(Mutex::new(Vec::new())),
140 wal: Arc::new(Mutex::new(wal)),
141 manifest: Arc::new(Mutex::new(manifest)),
142 compactor: Arc::new(Mutex::new(compactor)),
143 sequence: Arc::new(RwLock::new(sequence)),
144 };
145
146 engine.recover()?;
148
149 Ok(engine)
150 }
151
152 fn recover(&self) -> Result<()> {
154 let wal = self.wal.lock().map_err(|_| Error::LockPoisoned)?;
155 let records = wal.recover()?;
156
157 let mut memtable = self.memtable.write().map_err(|_| Error::LockPoisoned)?;
158
159 for record in records {
160 match &record.payload {
161 RecordPayload::Put { key, value } => {
162 memtable.put(key.clone(), value.clone());
163 }
164 RecordPayload::Delete { key } => {
165 memtable.delete(key.clone());
166 }
167 _ => {}
168 }
169 }
170
171 Ok(())
172 }
173
174 pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
176 let _seq = {
178 let mut sequence = self.sequence.write().map_err(|_| Error::LockPoisoned)?;
179 *sequence += 1;
180 *sequence
181 };
182
183 {
185 let mut wal = self.wal.lock().map_err(|_| Error::LockPoisoned)?;
186 let record = WalRecord::put(key.to_vec(), value.to_vec());
187 wal.append(record)?;
188 }
189
190 {
192 let mut memtable = self.memtable.write().map_err(|_| Error::LockPoisoned)?;
193 memtable.put(key.to_vec(), value.to_vec());
194 }
195
196 self.maybe_flush()?;
198
199 Ok(())
200 }
201
202 pub fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
204 {
206 let memtable = self.memtable.read().map_err(|_| Error::LockPoisoned)?;
207 if let Some(result) = memtable.get(key) {
208 return match result {
209 Some(value) => Ok(Some(value.to_vec())),
210 None => Ok(None), };
212 }
213 }
214
215 {
217 let immutable = self
218 .immutable_memtables
219 .lock()
220 .map_err(|_| Error::LockPoisoned)?;
221 for mt in immutable.iter().rev() {
222 if let Some(result) = mt.get(key) {
223 return match result {
224 Some(value) => Ok(Some(value.to_vec())),
225 None => Ok(None), };
227 }
228 }
229 }
230
231 {
233 let manifest = self.manifest.lock().map_err(|_| Error::LockPoisoned)?;
234
235 for level in 0..7 {
237 let sstables = manifest.sstables_at_level(level);
238
239 let mut sorted: Vec<_> = sstables.iter().collect();
241 sorted.sort_by(|a, b| b.sequence.cmp(&a.sequence));
242
243 for sst in sorted {
244 if key < sst.min_key.as_slice() || key > sst.max_key.as_slice() {
246 continue;
247 }
248
249 let path = PathBuf::from(&sst.path);
251 if let Ok(mut reader) = SSTableReader::open(&path) {
252 if let Ok(Some(entry)) = reader.get(key) {
253 if entry.is_tombstone() {
254 return Ok(None);
255 }
256 return Ok(Some(entry.value));
257 }
258 }
259 }
260 }
261 }
262
263 Ok(None)
264 }
265
266 pub fn delete(&self, key: &[u8]) -> Result<()> {
268 let _seq = {
270 let mut sequence = self.sequence.write().map_err(|_| Error::LockPoisoned)?;
271 *sequence += 1;
272 *sequence
273 };
274
275 {
277 let mut wal = self.wal.lock().map_err(|_| Error::LockPoisoned)?;
278 let record = WalRecord::delete(key.to_vec());
279 wal.append(record)?;
280 }
281
282 {
284 let mut memtable = self.memtable.write().map_err(|_| Error::LockPoisoned)?;
285 memtable.delete(key.to_vec());
286 }
287
288 Ok(())
289 }
290
291 fn maybe_flush(&self) -> Result<()> {
293 let should_flush = {
294 let memtable = self.memtable.read().map_err(|_| Error::LockPoisoned)?;
295 memtable.size_bytes() >= self.config.memtable_size
296 };
297
298 if should_flush {
299 self.flush()?;
300 }
301
302 Ok(())
303 }
304
305 pub fn flush(&self) -> Result<()> {
307 let old_memtable = {
309 let mut memtable = self.memtable.write().map_err(|_| Error::LockPoisoned)?;
310 let sequence = memtable.sequence();
311 let old = std::mem::replace(&mut *memtable, Memtable::with_sequence(sequence));
312 Arc::new(old)
313 };
314
315 if old_memtable.is_empty() {
316 return Ok(());
317 }
318
319 {
321 let mut immutable = self
322 .immutable_memtables
323 .lock()
324 .map_err(|_| Error::LockPoisoned)?;
325 immutable.push(Arc::clone(&old_memtable));
326 }
327
328 let timestamp = std::time::SystemTime::now()
330 .duration_since(std::time::UNIX_EPOCH)
331 .unwrap_or_default()
332 .as_millis();
333 let sst_path = self.dir.join("sst").join(format!("L0_{}.sst", timestamp));
334
335 let mt_for_iter = {
337 let entries: Vec<_> = old_memtable
338 .iter()
339 .map(|(k, v)| (k.clone(), v.clone()))
340 .collect();
341 entries
342 };
343
344 let meta = SSTableWriter::from_memtable(&sst_path, mt_for_iter.into_iter())?;
346
347 {
349 let mut manifest = self.manifest.lock().map_err(|_| Error::LockPoisoned)?;
350 manifest.add_sstable(&meta)?;
351 manifest.update_sequence(old_memtable.sequence())?;
352 }
353
354 {
356 let mut immutable = self
357 .immutable_memtables
358 .lock()
359 .map_err(|_| Error::LockPoisoned)?;
360 immutable.retain(|m| !Arc::ptr_eq(m, &old_memtable));
361 }
362
363 if self.config.enable_compaction {
365 self.maybe_compact()?;
366 }
367
368 Ok(())
369 }
370
371 fn maybe_compact(&self) -> Result<()> {
373 let mut compactor = self.compactor.lock().map_err(|_| Error::LockPoisoned)?;
374 let mut manifest = self.manifest.lock().map_err(|_| Error::LockPoisoned)?;
375
376 if compactor.needs_compaction(&manifest) {
377 compactor.compact_level0(&mut manifest)?;
378 }
379
380 Ok(())
381 }
382
383 pub fn sync(&self) -> Result<()> {
385 {
387 let mut wal = self.wal.lock().map_err(|_| Error::LockPoisoned)?;
388 wal.sync()?;
389 }
390
391 self.flush()?;
393
394 {
396 let mut manifest = self.manifest.lock().map_err(|_| Error::LockPoisoned)?;
397 manifest.rewrite()?;
398 }
399
400 Ok(())
401 }
402
403 pub fn stats(&self) -> StorageStats {
405 let memtable = self.memtable.read().ok();
406 let manifest = self.manifest.lock().ok();
407 let compactor = self.compactor.lock().ok();
408
409 let (memtable_size, memtable_entries) = match &memtable {
410 Some(m) => (m.size_bytes(), m.len()),
411 None => (0, 0),
412 };
413
414 StorageStats {
415 memtable_size,
416 memtable_entries,
417 sstable_count: manifest
418 .as_ref()
419 .map(|m| m.all_sstables().len())
420 .unwrap_or(0),
421 total_disk_size: manifest.as_ref().map(|m| m.total_size()).unwrap_or(0),
422 level_counts: manifest.map(|m| m.level_counts()).unwrap_or_default(),
423 compaction_stats: compactor.map(|c| c.stats().clone()).unwrap_or_default(),
424 }
425 }
426
427 pub fn close(self) -> Result<()> {
429 self.flush()?;
431 self.sync()?;
432 Ok(())
433 }
434}
435
436#[derive(Debug, Clone, Default)]
438pub struct StorageStats {
439 pub memtable_size: u64,
441 pub memtable_entries: usize,
443 pub sstable_count: usize,
445 pub total_disk_size: u64,
447 pub level_counts: Vec<usize>,
449 pub compaction_stats: CompactionStats,
451}
452
453#[cfg(test)]
454mod tests {
455 use super::*;
456 use tempfile::tempdir;
457
458 #[test]
459 fn test_storage_engine_basic() {
460 let dir = tempdir().unwrap();
461 let engine = StorageEngine::open(dir.path()).unwrap();
462
463 engine.put(b"key1", b"value1").unwrap();
465 engine.put(b"key2", b"value2").unwrap();
466
467 assert_eq!(engine.get(b"key1").unwrap(), Some(b"value1".to_vec()));
468 assert_eq!(engine.get(b"key2").unwrap(), Some(b"value2".to_vec()));
469 assert_eq!(engine.get(b"key3").unwrap(), None);
470 }
471
472 #[test]
473 fn test_storage_engine_update() {
474 let dir = tempdir().unwrap();
475 let engine = StorageEngine::open(dir.path()).unwrap();
476
477 engine.put(b"key", b"value1").unwrap();
478 assert_eq!(engine.get(b"key").unwrap(), Some(b"value1".to_vec()));
479
480 engine.put(b"key", b"value2").unwrap();
481 assert_eq!(engine.get(b"key").unwrap(), Some(b"value2".to_vec()));
482 }
483
484 #[test]
485 fn test_storage_engine_delete() {
486 let dir = tempdir().unwrap();
487 let engine = StorageEngine::open(dir.path()).unwrap();
488
489 engine.put(b"key", b"value").unwrap();
490 assert_eq!(engine.get(b"key").unwrap(), Some(b"value".to_vec()));
491
492 engine.delete(b"key").unwrap();
493 assert_eq!(engine.get(b"key").unwrap(), None);
494 }
495
496 #[test]
497 fn test_storage_engine_flush() {
498 let dir = tempdir().unwrap();
499 let config = StorageConfig {
500 memtable_size: 100, enable_compaction: false,
502 ..Default::default()
503 };
504 let engine = StorageEngine::open_with_config(dir.path(), config).unwrap();
505
506 for i in 0..10 {
508 let key = format!("key{:03}", i);
509 let value = format!("value{}", i);
510 engine.put(key.as_bytes(), value.as_bytes()).unwrap();
511 }
512
513 engine.flush().unwrap();
515
516 assert_eq!(engine.get(b"key000").unwrap(), Some(b"value0".to_vec()));
518
519 let stats = engine.stats();
521 assert!(stats.sstable_count > 0 || stats.memtable_entries > 0);
522 }
523
524 #[test]
525 fn test_storage_engine_recovery() {
526 let dir = tempdir().unwrap();
527
528 {
530 let engine = StorageEngine::open(dir.path()).unwrap();
531 engine.put(b"persistent", b"data").unwrap();
532 }
534
535 {
537 let engine = StorageEngine::open(dir.path()).unwrap();
538 assert_eq!(engine.get(b"persistent").unwrap(), Some(b"data".to_vec()));
539 }
540 }
541
542 #[test]
543 fn test_storage_stats() {
544 let dir = tempdir().unwrap();
545 let engine = StorageEngine::open(dir.path()).unwrap();
546
547 engine.put(b"key", b"value").unwrap();
548
549 let stats = engine.stats();
550 assert!(stats.memtable_size > 0 || stats.memtable_entries > 0);
551 }
552}