1use rustlite_core::{Error, Result};
38use rustlite_wal::{RecordPayload, SyncMode, WalConfig, WalManager, WalRecord};
39use std::path::{Path, PathBuf};
40use std::sync::{Arc, Mutex, RwLock};
41
42pub mod compaction;
43pub mod manifest;
44pub mod memtable;
45pub mod sstable;
46
47pub use compaction::{CompactionConfig, CompactionStats, CompactionWorker};
48pub use manifest::{Manifest, ManifestSSTable};
49pub use memtable::{Memtable, MemtableEntry};
50pub use sstable::{SSTableEntry, SSTableMeta, SSTableReader, SSTableWriter};
51
52const DEFAULT_MEMTABLE_SIZE: u64 = 4 * 1024 * 1024;
54
55#[derive(Debug, Clone)]
57pub struct StorageConfig {
58 pub memtable_size: u64,
60 pub sync_mode: SyncMode,
62 pub compaction: CompactionConfig,
64 pub enable_compaction: bool,
66}
67
68impl Default for StorageConfig {
69 fn default() -> Self {
70 Self {
71 memtable_size: DEFAULT_MEMTABLE_SIZE,
72 sync_mode: SyncMode::Sync,
73 compaction: CompactionConfig::default(),
74 enable_compaction: true,
75 }
76 }
77}
78
79pub struct StorageEngine {
83 dir: PathBuf,
85 config: StorageConfig,
87 memtable: Arc<RwLock<Memtable>>,
89 immutable_memtables: Arc<Mutex<Vec<Arc<Memtable>>>>,
91 wal: Arc<Mutex<WalManager>>,
93 manifest: Arc<Mutex<Manifest>>,
95 compactor: Arc<Mutex<CompactionWorker>>,
97 sequence: Arc<RwLock<u64>>,
99}
100
101impl StorageEngine {
102 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
104 Self::open_with_config(path, StorageConfig::default())
105 }
106
107 pub fn open_with_config(path: impl AsRef<Path>, config: StorageConfig) -> Result<Self> {
109 let dir = path.as_ref().to_path_buf();
110 std::fs::create_dir_all(&dir)?;
111
112 std::fs::create_dir_all(dir.join("wal"))?;
114 std::fs::create_dir_all(dir.join("sst"))?;
115
116 let wal_config = WalConfig {
118 wal_dir: dir.join("wal"),
119 sync_mode: config.sync_mode,
120 ..Default::default()
121 };
122 let mut wal = WalManager::new(wal_config)?;
123 wal.open()?;
124
125 let manifest = Manifest::open(&dir)?;
127 let sequence = manifest.sequence();
128
129 let compactor = CompactionWorker::new(&dir, config.compaction.clone());
131
132 let memtable = Memtable::with_sequence(sequence);
134
135 let engine = Self {
136 dir,
137 config,
138 memtable: Arc::new(RwLock::new(memtable)),
139 immutable_memtables: Arc::new(Mutex::new(Vec::new())),
140 wal: Arc::new(Mutex::new(wal)),
141 manifest: Arc::new(Mutex::new(manifest)),
142 compactor: Arc::new(Mutex::new(compactor)),
143 sequence: Arc::new(RwLock::new(sequence)),
144 };
145
146 engine.recover()?;
148
149 Ok(engine)
150 }
151
152 fn recover(&self) -> Result<()> {
154 let wal = self.wal.lock().map_err(|_| Error::LockPoisoned)?;
155 let records = wal.recover()?;
156
157 let mut memtable = self.memtable.write().map_err(|_| Error::LockPoisoned)?;
158
159 for record in records {
160 match &record.payload {
161 RecordPayload::Put { key, value } => {
162 memtable.put(key.clone(), value.clone());
163 }
164 RecordPayload::Delete { key } => {
165 memtable.delete(key.clone());
166 }
167 _ => {}
168 }
169 }
170
171 Ok(())
172 }
173
174 pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
176 let _seq = {
178 let mut sequence = self.sequence.write().map_err(|_| Error::LockPoisoned)?;
179 *sequence += 1;
180 *sequence
181 };
182
183 {
185 let mut wal = self.wal.lock().map_err(|_| Error::LockPoisoned)?;
186 let record = WalRecord::put(key.to_vec(), value.to_vec());
187 wal.append(record)?;
188 }
189
190 {
192 let mut memtable = self.memtable.write().map_err(|_| Error::LockPoisoned)?;
193 memtable.put(key.to_vec(), value.to_vec());
194 }
195
196 self.maybe_flush()?;
198
199 Ok(())
200 }
201
202 pub fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
204 {
206 let memtable = self.memtable.read().map_err(|_| Error::LockPoisoned)?;
207 if let Some(result) = memtable.get(key) {
208 return match result {
209 Some(value) => Ok(Some(value.to_vec())),
210 None => Ok(None), };
212 }
213 }
214
215 {
217 let immutable = self.immutable_memtables.lock().map_err(|_| Error::LockPoisoned)?;
218 for mt in immutable.iter().rev() {
219 if let Some(result) = mt.get(key) {
220 return match result {
221 Some(value) => Ok(Some(value.to_vec())),
222 None => Ok(None), };
224 }
225 }
226 }
227
228 {
230 let manifest = self.manifest.lock().map_err(|_| Error::LockPoisoned)?;
231
232 for level in 0..7 {
234 let sstables = manifest.sstables_at_level(level);
235
236 let mut sorted: Vec<_> = sstables.iter().collect();
238 sorted.sort_by(|a, b| b.sequence.cmp(&a.sequence));
239
240 for sst in sorted {
241 if key < sst.min_key.as_slice() || key > sst.max_key.as_slice() {
243 continue;
244 }
245
246 let path = PathBuf::from(&sst.path);
248 if let Ok(mut reader) = SSTableReader::open(&path) {
249 if let Ok(Some(entry)) = reader.get(key) {
250 if entry.is_tombstone() {
251 return Ok(None);
252 }
253 return Ok(Some(entry.value));
254 }
255 }
256 }
257 }
258 }
259
260 Ok(None)
261 }
262
263 pub fn delete(&self, key: &[u8]) -> Result<()> {
265 let _seq = {
267 let mut sequence = self.sequence.write().map_err(|_| Error::LockPoisoned)?;
268 *sequence += 1;
269 *sequence
270 };
271
272 {
274 let mut wal = self.wal.lock().map_err(|_| Error::LockPoisoned)?;
275 let record = WalRecord::delete(key.to_vec());
276 wal.append(record)?;
277 }
278
279 {
281 let mut memtable = self.memtable.write().map_err(|_| Error::LockPoisoned)?;
282 memtable.delete(key.to_vec());
283 }
284
285 Ok(())
286 }
287
288 fn maybe_flush(&self) -> Result<()> {
290 let should_flush = {
291 let memtable = self.memtable.read().map_err(|_| Error::LockPoisoned)?;
292 memtable.size_bytes() >= self.config.memtable_size
293 };
294
295 if should_flush {
296 self.flush()?;
297 }
298
299 Ok(())
300 }
301
302 pub fn flush(&self) -> Result<()> {
304 let old_memtable = {
306 let mut memtable = self.memtable.write().map_err(|_| Error::LockPoisoned)?;
307 let sequence = memtable.sequence();
308 let old = std::mem::replace(&mut *memtable, Memtable::with_sequence(sequence));
309 Arc::new(old)
310 };
311
312 if old_memtable.is_empty() {
313 return Ok(());
314 }
315
316 {
318 let mut immutable = self.immutable_memtables.lock().map_err(|_| Error::LockPoisoned)?;
319 immutable.push(Arc::clone(&old_memtable));
320 }
321
322 let timestamp = std::time::SystemTime::now()
324 .duration_since(std::time::UNIX_EPOCH)
325 .unwrap_or_default()
326 .as_millis();
327 let sst_path = self.dir.join("sst").join(format!("L0_{}.sst", timestamp));
328
329 let mt_for_iter = {
331 let entries: Vec<_> = old_memtable.iter()
332 .map(|(k, v)| (k.clone(), v.clone()))
333 .collect();
334 entries
335 };
336
337 let meta = SSTableWriter::from_memtable(&sst_path, mt_for_iter.into_iter())?;
339
340 {
342 let mut manifest = self.manifest.lock().map_err(|_| Error::LockPoisoned)?;
343 manifest.add_sstable(&meta)?;
344 manifest.update_sequence(old_memtable.sequence())?;
345 }
346
347 {
349 let mut immutable = self.immutable_memtables.lock().map_err(|_| Error::LockPoisoned)?;
350 immutable.retain(|m| !Arc::ptr_eq(m, &old_memtable));
351 }
352
353 if self.config.enable_compaction {
355 self.maybe_compact()?;
356 }
357
358 Ok(())
359 }
360
361 fn maybe_compact(&self) -> Result<()> {
363 let mut compactor = self.compactor.lock().map_err(|_| Error::LockPoisoned)?;
364 let mut manifest = self.manifest.lock().map_err(|_| Error::LockPoisoned)?;
365
366 if compactor.needs_compaction(&manifest) {
367 compactor.compact_level0(&mut manifest)?;
368 }
369
370 Ok(())
371 }
372
373 pub fn sync(&self) -> Result<()> {
375 {
377 let mut wal = self.wal.lock().map_err(|_| Error::LockPoisoned)?;
378 wal.sync()?;
379 }
380
381 self.flush()?;
383
384 {
386 let mut manifest = self.manifest.lock().map_err(|_| Error::LockPoisoned)?;
387 manifest.rewrite()?;
388 }
389
390 Ok(())
391 }
392
393 pub fn stats(&self) -> StorageStats {
395 let memtable = self.memtable.read().ok();
396 let manifest = self.manifest.lock().ok();
397 let compactor = self.compactor.lock().ok();
398
399 let (memtable_size, memtable_entries) = match &memtable {
400 Some(m) => (m.size_bytes(), m.len()),
401 None => (0, 0),
402 };
403
404 StorageStats {
405 memtable_size,
406 memtable_entries,
407 sstable_count: manifest.as_ref().map(|m| m.all_sstables().len()).unwrap_or(0),
408 total_disk_size: manifest.as_ref().map(|m| m.total_size()).unwrap_or(0),
409 level_counts: manifest.map(|m| m.level_counts()).unwrap_or_default(),
410 compaction_stats: compactor.map(|c| c.stats().clone()).unwrap_or_default(),
411 }
412 }
413
414 pub fn close(self) -> Result<()> {
416 self.flush()?;
418 self.sync()?;
419 Ok(())
420 }
421}
422
423#[derive(Debug, Clone, Default)]
425pub struct StorageStats {
426 pub memtable_size: u64,
428 pub memtable_entries: usize,
430 pub sstable_count: usize,
432 pub total_disk_size: u64,
434 pub level_counts: Vec<usize>,
436 pub compaction_stats: CompactionStats,
438}
439
440#[cfg(test)]
441mod tests {
442 use super::*;
443 use tempfile::tempdir;
444
445 #[test]
446 fn test_storage_engine_basic() {
447 let dir = tempdir().unwrap();
448 let engine = StorageEngine::open(dir.path()).unwrap();
449
450 engine.put(b"key1", b"value1").unwrap();
452 engine.put(b"key2", b"value2").unwrap();
453
454 assert_eq!(engine.get(b"key1").unwrap(), Some(b"value1".to_vec()));
455 assert_eq!(engine.get(b"key2").unwrap(), Some(b"value2".to_vec()));
456 assert_eq!(engine.get(b"key3").unwrap(), None);
457 }
458
459 #[test]
460 fn test_storage_engine_update() {
461 let dir = tempdir().unwrap();
462 let engine = StorageEngine::open(dir.path()).unwrap();
463
464 engine.put(b"key", b"value1").unwrap();
465 assert_eq!(engine.get(b"key").unwrap(), Some(b"value1".to_vec()));
466
467 engine.put(b"key", b"value2").unwrap();
468 assert_eq!(engine.get(b"key").unwrap(), Some(b"value2".to_vec()));
469 }
470
471 #[test]
472 fn test_storage_engine_delete() {
473 let dir = tempdir().unwrap();
474 let engine = StorageEngine::open(dir.path()).unwrap();
475
476 engine.put(b"key", b"value").unwrap();
477 assert_eq!(engine.get(b"key").unwrap(), Some(b"value".to_vec()));
478
479 engine.delete(b"key").unwrap();
480 assert_eq!(engine.get(b"key").unwrap(), None);
481 }
482
483 #[test]
484 fn test_storage_engine_flush() {
485 let dir = tempdir().unwrap();
486 let config = StorageConfig {
487 memtable_size: 100, enable_compaction: false,
489 ..Default::default()
490 };
491 let engine = StorageEngine::open_with_config(dir.path(), config).unwrap();
492
493 for i in 0..10 {
495 let key = format!("key{:03}", i);
496 let value = format!("value{}", i);
497 engine.put(key.as_bytes(), value.as_bytes()).unwrap();
498 }
499
500 engine.flush().unwrap();
502
503 assert_eq!(engine.get(b"key000").unwrap(), Some(b"value0".to_vec()));
505
506 let stats = engine.stats();
508 assert!(stats.sstable_count > 0 || stats.memtable_entries > 0);
509 }
510
511 #[test]
512 fn test_storage_engine_recovery() {
513 let dir = tempdir().unwrap();
514
515 {
517 let engine = StorageEngine::open(dir.path()).unwrap();
518 engine.put(b"persistent", b"data").unwrap();
519 }
521
522 {
524 let engine = StorageEngine::open(dir.path()).unwrap();
525 assert_eq!(engine.get(b"persistent").unwrap(), Some(b"data".to_vec()));
526 }
527 }
528
529 #[test]
530 fn test_storage_stats() {
531 let dir = tempdir().unwrap();
532 let engine = StorageEngine::open(dir.path()).unwrap();
533
534 engine.put(b"key", b"value").unwrap();
535
536 let stats = engine.stats();
537 assert!(stats.memtable_size > 0 || stats.memtable_entries > 0);
538 }
539}