Skip to main content

lsm_db/
db.rs

1//! The storage engine.
2//!
3//! [`Lsm`] ties the in-memory memtable and the on-disk runs into the
4//! log-structured merge write path. Writes accumulate in the memtable; when it
5//! fills it is flushed to a new sorted run; a background thread compacts the runs
6//! into one when they grow too numerous. Reads consult the memtable, then each
7//! run newest first, merging where a range is requested.
8//!
9//! ## Concurrency
10//!
11//! Engine state — the memtable and the ordered list of runs — lives behind a
12//! single [`RwLock`]. Reads snapshot what they need (a value, or the run handles
13//! and a memtable slice) under a brief read lock, then do their file I/O without
14//! holding it. Writes take the write lock. Compaction does its expensive merge
15//! with no lock held and takes the write lock only to swap the finished run in,
16//! so it never blocks reads or writes for more than that swap. A run removed by
17//! compaction is reference counted: its file is deleted only when the last reader
18//! still holding it has finished (see [`SsTable`]'s `Drop`).
19
20use std::collections::HashSet;
21use std::fs;
22use std::ops::{Bound, RangeBounds};
23use std::path::{Path, PathBuf};
24use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
25use std::sync::{Arc, Condvar, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
26use std::thread::JoinHandle;
27
28use crate::batch::{Batch, Op};
29use crate::bloom::{self, RunFilter};
30use crate::cache::BlockCache;
31use crate::config::LsmConfig;
32use crate::durability::Durability;
33use crate::error::{Error, Result};
34use crate::manifest::{self, Manifest};
35use crate::memtable::MemTable;
36use crate::merge::Merge;
37use crate::record::Record;
38use crate::scan::Scan;
39use crate::sstable::{SsTable, SsTableWriter};
40
41/// Mutable engine state guarded by one lock.
42#[derive(Debug)]
43struct Inner {
44    memtable: MemTable,
45    /// Live runs, newest first.
46    runs: Vec<Arc<SsTable>>,
47    /// Write-ahead log (a no-op unless the `durability` feature is enabled).
48    durability: Durability,
49}
50
51/// Coordination state for the background compactor.
52#[derive(Debug, Default)]
53struct CompactionState {
54    /// A compaction has been requested and not yet started.
55    pending: bool,
56    /// A compaction is currently running.
57    running: bool,
58    /// The engine is shutting down; the compactor should exit.
59    shutdown: bool,
60    /// Bumped after every completed compaction attempt, so waiters can observe
61    /// progress.
62    generation: u64,
63}
64
65/// Shared engine internals, owned jointly by the handle and the compactor thread.
66#[derive(Debug)]
67struct Engine {
68    dir: PathBuf,
69    config: LsmConfig,
70    inner: RwLock<Inner>,
71    /// Next run sequence number to allocate. Atomic so a compaction can reserve
72    /// its output name without holding the engine lock.
73    next_seq: AtomicU64,
74    /// Guards against two compactions overlapping.
75    compacting: AtomicBool,
76    compaction: Mutex<CompactionState>,
77    cond: Condvar,
78    /// The last error a background compaction produced, if any.
79    last_error: Mutex<Option<Error>>,
80    /// Shared cache of decoded data blocks, consulted by point lookups across
81    /// every run.
82    cache: Arc<BlockCache>,
83}
84
85/// A log-structured merge-tree key-value store backed by a directory on disk.
86///
87/// `Lsm` is the Tier-1 entry point: [`open`](Lsm::open), [`put`](Lsm::put),
88/// [`get`](Lsm::get), [`delete`](Lsm::delete), and [`scan`](Lsm::scan) cover the
89/// whole common case. Keys and values are arbitrary byte strings; keys are
90/// ordered lexicographically.
91///
92/// The handle is cheap to share: every method takes `&self`, and the type is
93/// [`Send`] + [`Sync`], so one engine can be wrapped in an
94/// [`Arc`](std::sync::Arc) and used from many threads.
95///
96/// A background thread compacts runs as they accumulate. Dropping the `Lsm`
97/// signals that thread to stop and joins it, so no work outlives the handle.
98///
99/// # Durability
100///
101/// Flushed runs are `fsync`ed and recorded in a manifest, so flushed data
102/// survives reopening and a crash mid-flush or mid-compaction recovers to a
103/// consistent run set. Writes still buffered in the memtable when the process
104/// exits without a flush are **not** yet crash-safe; write-ahead logging arrives
105/// under the `durability` feature in a later release. Call [`flush`](Lsm::flush)
106/// to force the buffer to disk on demand.
107///
108/// # Examples
109///
110/// ```
111/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
112/// let dir = tempfile::tempdir()?;
113/// let db = lsm_db::Lsm::open(dir.path())?;
114///
115/// db.put(b"hello", b"world")?;
116/// assert_eq!(db.get(b"hello")?, Some(b"world".to_vec()));
117///
118/// db.delete(b"hello")?;
119/// assert_eq!(db.get(b"hello")?, None);
120/// # Ok(())
121/// # }
122/// ```
123#[derive(Debug)]
124pub struct Lsm {
125    engine: Arc<Engine>,
126    compactor: Option<JoinHandle<()>>,
127}
128
129impl Lsm {
130    /// Open the database in `dir`, creating the directory if it does not exist,
131    /// using the default [`LsmConfig`].
132    ///
133    /// The run set recorded in the manifest is reopened, so flushed data is
134    /// visible immediately. Temporary files and run files orphaned by a crash
135    /// mid-flush or mid-compaction are reclaimed.
136    ///
137    /// # Examples
138    ///
139    /// ```
140    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
141    /// let dir = tempfile::tempdir()?;
142    /// let db = lsm_db::Lsm::open(dir.path())?;
143    /// db.put(b"k", b"v")?;
144    /// # Ok(())
145    /// # }
146    /// ```
147    pub fn open(dir: impl AsRef<Path>) -> Result<Self> {
148        Self::open_with(dir, LsmConfig::default())
149    }
150
151    /// Open the database in `dir` with an explicit [`LsmConfig`].
152    ///
153    /// # Examples
154    ///
155    /// ```
156    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
157    /// use lsm_db::{Lsm, LsmConfig};
158    /// let dir = tempfile::tempdir()?;
159    /// let db = Lsm::open_with(dir.path(), LsmConfig::new().memtable_capacity(64 * 1024))?;
160    /// db.put(b"k", b"v")?;
161    /// # Ok(())
162    /// # }
163    /// ```
164    pub fn open_with(dir: impl AsRef<Path>, config: LsmConfig) -> Result<Self> {
165        let dir = dir.as_ref().to_path_buf();
166        fs::create_dir_all(&dir).map_err(|e| Error::io("create database directory", e))?;
167
168        let manifest = Manifest::load(&dir)?;
169        let (run_names, manifest_seq) = match manifest {
170            Some(m) => (m.runs, m.next_seq),
171            None => (Vec::new(), 0),
172        };
173        let live: HashSet<&str> = run_names.iter().map(String::as_str).collect();
174
175        // Shared block cache for point lookups, sized from the config.
176        let cache = BlockCache::new(config.block_cache_capacity_bytes());
177
178        // Open the live runs in recency order, attaching each run's bloom filter
179        // from its sidecar (a no-op without the `bloom` feature, and a tolerated
180        // miss if a sidecar is absent — the run is simply consulted directly) and
181        // the shared block cache.
182        let mut runs = Vec::with_capacity(run_names.len());
183        for name in &run_names {
184            let path = dir.join(name);
185            if !path.exists() {
186                return Err(Error::corruption("manifest references a missing run"));
187            }
188            let mut table = SsTable::open(&path)?;
189            table.attach_filter(RunFilter::load(&path)?);
190            table.attach_cache(Arc::clone(&cache));
191            runs.push(Arc::new(table));
192        }
193
194        // Reclaim orphans (temporaries and runs no longer in the manifest) and
195        // make sure the next sequence number is past every file on disk.
196        let mut next_seq = manifest_seq;
197        for entry in fs::read_dir(&dir).map_err(|e| Error::io("scan database directory", e))? {
198            let entry = entry.map_err(|e| Error::io("read directory entry", e))?;
199            let name = entry.file_name().to_string_lossy().into_owned();
200            if name.ends_with(".tmp") {
201                fs::remove_file(entry.path()).map_err(|e| Error::io("remove temporary file", e))?;
202            } else if let Some(run) = name.strip_suffix(".bloom") {
203                // A bloom sidecar whose run is not live is an orphan (e.g. from a
204                // compaction that crashed before its manifest commit).
205                if !live.contains(run) {
206                    fs::remove_file(entry.path())
207                        .map_err(|e| Error::io("remove orphan bloom sidecar", e))?;
208                }
209            } else if let Some(seq) = manifest::seq_of(&name) {
210                next_seq = next_seq.max(seq + 1);
211                if !live.contains(name.as_str()) {
212                    fs::remove_file(entry.path()).map_err(|e| Error::io("remove orphan run", e))?;
213                }
214            }
215        }
216
217        // Open the write-ahead log and replay any writes not yet flushed into a
218        // fresh memtable. With the `durability` feature off this is a no-op.
219        let durability = Durability::open(&dir)?;
220        let mut memtable = MemTable::new();
221        durability.replay(&mut memtable)?;
222
223        let engine = Arc::new(Engine {
224            dir,
225            config,
226            inner: RwLock::new(Inner {
227                memtable,
228                runs,
229                durability,
230            }),
231            next_seq: AtomicU64::new(next_seq),
232            compacting: AtomicBool::new(false),
233            compaction: Mutex::new(CompactionState::default()),
234            cond: Condvar::new(),
235            last_error: Mutex::new(None),
236            cache,
237        });
238
239        // Checkpoint recovered writes: persist them as a run and empty the log,
240        // so recovery only ever replays the writes since the most recent flush.
241        engine.flush()?;
242
243        let compactor = {
244            let engine = Arc::clone(&engine);
245            std::thread::Builder::new()
246                .name("lsm-compactor".to_owned())
247                .spawn(move || compactor_loop(&engine))
248                .map_err(|e| Error::io("spawn compaction thread", e))?
249        };
250
251        Ok(Lsm {
252            engine,
253            compactor: Some(compactor),
254        })
255    }
256
257    /// Set `key` to `value`, overwriting any previous value.
258    ///
259    /// # Examples
260    ///
261    /// ```
262    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
263    /// let dir = tempfile::tempdir()?;
264    /// let db = lsm_db::Lsm::open(dir.path())?;
265    /// db.put(b"key", b"value")?;
266    /// assert_eq!(db.get(b"key")?, Some(b"value".to_vec()));
267    /// # Ok(())
268    /// # }
269    /// ```
270    pub fn put(&self, key: impl AsRef<[u8]>, value: impl AsRef<[u8]>) -> Result<()> {
271        self.engine.put(key.as_ref(), value.as_ref())
272    }
273
274    /// Delete `key`. A subsequent [`get`](Lsm::get) returns `None`.
275    ///
276    /// Deleting a key that is not present is not an error.
277    ///
278    /// # Examples
279    ///
280    /// ```
281    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
282    /// let dir = tempfile::tempdir()?;
283    /// let db = lsm_db::Lsm::open(dir.path())?;
284    /// db.put(b"key", b"value")?;
285    /// db.delete(b"key")?;
286    /// assert_eq!(db.get(b"key")?, None);
287    /// db.delete(b"never-existed")?; // not an error
288    /// # Ok(())
289    /// # }
290    /// ```
291    pub fn delete(&self, key: impl AsRef<[u8]>) -> Result<()> {
292        self.engine.delete(key.as_ref())
293    }
294
295    /// Apply a [`Batch`] of writes as one group.
296    ///
297    /// The whole batch is applied under a single lock acquisition, so concurrent
298    /// readers observe either none or all of it.
299    ///
300    /// # Examples
301    ///
302    /// ```
303    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
304    /// use lsm_db::Batch;
305    /// let dir = tempfile::tempdir()?;
306    /// let db = lsm_db::Lsm::open(dir.path())?;
307    ///
308    /// let mut batch = Batch::new();
309    /// batch.put(b"a", b"1");
310    /// batch.put(b"b", b"2");
311    /// batch.delete(b"c");
312    /// db.write(batch)?;
313    ///
314    /// assert_eq!(db.get(b"a")?, Some(b"1".to_vec()));
315    /// # Ok(())
316    /// # }
317    /// ```
318    pub fn write(&self, batch: Batch) -> Result<()> {
319        self.engine.write(batch)
320    }
321
322    /// Look up `key`, returning its value, or `None` if it is absent or deleted.
323    ///
324    /// # Examples
325    ///
326    /// ```
327    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
328    /// let dir = tempfile::tempdir()?;
329    /// let db = lsm_db::Lsm::open(dir.path())?;
330    /// assert_eq!(db.get(b"missing")?, None);
331    /// db.put(b"present", b"1")?;
332    /// assert_eq!(db.get(b"present")?, Some(b"1".to_vec()));
333    /// # Ok(())
334    /// # }
335    /// ```
336    pub fn get(&self, key: impl AsRef<[u8]>) -> Result<Option<Vec<u8>>> {
337        self.engine.get(key.as_ref())
338    }
339
340    /// Iterate the live `(key, value)` pairs whose key falls in `range`, in
341    /// ascending key order.
342    ///
343    /// The range is taken over `Vec<u8>` bounds, so the usual syntaxes all work:
344    /// `..`, `a..b`, `a..=b`, `a..`, `..b`. The returned [`Scan`] is a consistent
345    /// snapshot taken when `scan` is called; later writes do not affect it.
346    ///
347    /// # Examples
348    ///
349    /// ```
350    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
351    /// let dir = tempfile::tempdir()?;
352    /// let db = lsm_db::Lsm::open(dir.path())?;
353    /// db.put(b"a", b"1")?;
354    /// db.put(b"b", b"2")?;
355    /// db.put(b"c", b"3")?;
356    ///
357    /// let mid: Vec<_> = db.scan(b"a".to_vec()..b"c".to_vec())?.collect();
358    /// assert_eq!(mid, vec![(b"a".to_vec(), b"1".to_vec()), (b"b".to_vec(), b"2".to_vec())]);
359    /// assert_eq!(db.scan(..)?.count(), 3);
360    /// # Ok(())
361    /// # }
362    /// ```
363    pub fn scan<R>(&self, range: R) -> Result<Scan>
364    where
365        R: RangeBounds<Vec<u8>>,
366    {
367        self.engine.scan(range)
368    }
369
370    /// Force the in-memory buffer to disk as a new run.
371    ///
372    /// Flushing an empty buffer is a no-op. After a successful flush every
373    /// previously written key is durable and will be read back on reopen.
374    ///
375    /// # Examples
376    ///
377    /// ```
378    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
379    /// let dir = tempfile::tempdir()?;
380    /// let db = lsm_db::Lsm::open(dir.path())?;
381    /// db.put(b"k", b"v")?;
382    /// db.flush()?;
383    ///
384    /// drop(db);
385    /// let db = lsm_db::Lsm::open(dir.path())?;
386    /// assert_eq!(db.get(b"k")?, Some(b"v".to_vec()));
387    /// # Ok(())
388    /// # }
389    /// ```
390    pub fn flush(&self) -> Result<()> {
391        self.engine.flush()
392    }
393
394    /// Run one compaction synchronously. Test-only; production compaction is the
395    /// background thread.
396    #[cfg(test)]
397    pub(crate) fn compact_now(&self) -> Result<()> {
398        self.engine.compact_once()
399    }
400
401    /// The number of live on-disk runs. Test-only.
402    #[cfg(test)]
403    pub(crate) fn run_count(&self) -> usize {
404        self.engine.read_guard().runs.len()
405    }
406
407    /// Block until the background compactor is idle (nothing pending or running).
408    /// Test-only.
409    #[cfg(test)]
410    pub(crate) fn wait_for_idle(&self) {
411        let mut state = self
412            .engine
413            .compaction
414            .lock()
415            .unwrap_or_else(|p| p.into_inner());
416        while state.pending || state.running {
417            state = self
418                .engine
419                .cond
420                .wait(state)
421                .unwrap_or_else(|p| p.into_inner());
422        }
423    }
424}
425
426impl Drop for Lsm {
427    fn drop(&mut self) {
428        {
429            let mut state = self
430                .engine
431                .compaction
432                .lock()
433                .unwrap_or_else(|p| p.into_inner());
434            state.shutdown = true;
435        }
436        self.engine.cond.notify_all();
437        if let Some(handle) = self.compactor.take() {
438            let _ = handle.join();
439        }
440    }
441}
442
443impl Engine {
444    fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
445        let record = Record::Value(value.to_vec());
446        let mut inner = self.write_guard();
447        // Durably log the write before it is acknowledged, then buffer it.
448        inner.durability.log_one(key, &record)?;
449        inner.memtable.apply(key.to_vec(), record);
450        self.maybe_flush(&mut inner)
451    }
452
453    fn delete(&self, key: &[u8]) -> Result<()> {
454        let record = Record::Tombstone;
455        let mut inner = self.write_guard();
456        inner.durability.log_one(key, &record)?;
457        inner.memtable.apply(key.to_vec(), record);
458        self.maybe_flush(&mut inner)
459    }
460
461    fn write(&self, batch: Batch) -> Result<()> {
462        let ops: Vec<(Vec<u8>, Record)> = batch
463            .into_ops()
464            .into_iter()
465            .map(|(key, op)| {
466                let record = match op {
467                    Op::Put(value) => Record::Value(value),
468                    Op::Delete => Record::Tombstone,
469                };
470                (key, record)
471            })
472            .collect();
473
474        let mut inner = self.write_guard();
475        // The whole batch is logged as one record, so it is recovered atomically.
476        inner.durability.log_batch(&ops)?;
477        for (key, record) in ops {
478            inner.memtable.apply(key, record);
479        }
480        self.maybe_flush(&mut inner)
481    }
482
483    fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
484        let runs = {
485            let inner = self.read_guard();
486            match inner.memtable.get(key) {
487                Some(Record::Value(value)) => return Ok(Some(value.clone())),
488                Some(Record::Tombstone) => return Ok(None),
489                None => inner.runs.clone(),
490            }
491        };
492        // Runs are searched newest first, with no lock held. The bloom filter
493        // lets a definite miss skip the run without reading any block.
494        for run in &runs {
495            if !run.might_contain(key) {
496                continue;
497            }
498            match run.lookup(key)? {
499                Some(Record::Value(value)) => return Ok(Some(value)),
500                Some(Record::Tombstone) => return Ok(None),
501                None => {}
502            }
503        }
504        Ok(None)
505    }
506
507    fn scan<R>(&self, range: R) -> Result<Scan>
508    where
509        R: RangeBounds<Vec<u8>>,
510    {
511        let (mem, runs) = {
512            let inner = self.read_guard();
513            let mem: Vec<(Vec<u8>, Record)> = inner
514                .memtable
515                .iter()
516                .filter(|(k, _)| matches!(position(&range, k), Pos::In))
517                .map(|(k, r)| (k.clone(), r.clone()))
518                .collect();
519            (mem, inner.runs.clone())
520        };
521
522        let cursors = runs.iter().map(|r| r.cursor()).collect();
523        let mut out = Vec::new();
524        for item in Merge::new(mem, cursors) {
525            let (key, value) = item?;
526            match position(&range, &key) {
527                Pos::Below => {}
528                Pos::In => out.push((key, value)),
529                Pos::Above => break, // ascending stream: nothing further qualifies
530            }
531        }
532        Ok(Scan::new(out))
533    }
534
535    fn flush(&self) -> Result<()> {
536        let mut inner = self.write_guard();
537        if inner.memtable.is_empty() {
538            return Ok(());
539        }
540        self.flush_locked(&mut inner)
541    }
542
543    /// Flush if the buffer has reached the configured capacity.
544    fn maybe_flush(&self, inner: &mut Inner) -> Result<()> {
545        if !inner.memtable.is_empty()
546            && inner.memtable.approx_size() >= self.config.memtable_capacity_bytes()
547        {
548            self.flush_locked(inner)?;
549        }
550        Ok(())
551    }
552
553    /// Write the memtable to a new run and install it, newest first.
554    fn flush_locked(&self, inner: &mut Inner) -> Result<()> {
555        let entries = inner.memtable.take();
556        let seq = self.next_seq.fetch_add(1, Ordering::SeqCst);
557        let name = manifest::run_filename(seq);
558        let tmp = self.dir.join(format!("{name}.tmp"));
559        let final_path = self.dir.join(&name);
560
561        let mut writer = SsTableWriter::create(&tmp)?;
562        let mut filter = bloom::builder(entries.len());
563        for (key, record) in &entries {
564            writer.push(key, record)?;
565            filter.add(key);
566        }
567        writer.finish()?;
568        fs::rename(&tmp, &final_path).map_err(|e| Error::io("install flushed run", e))?;
569
570        // Write the bloom sidecar before the manifest commit, so any run the
571        // manifest names is guaranteed to have its sidecar on disk.
572        let filter = filter.finish();
573        if let Some(filter) = &filter {
574            filter.write_sidecar(&final_path)?;
575        }
576        let mut table = SsTable::open(&final_path)?;
577        table.attach_filter(filter);
578        table.attach_cache(Arc::clone(&self.cache));
579
580        let run = Arc::new(table);
581        let mut new_runs = Vec::with_capacity(inner.runs.len() + 1);
582        new_runs.push(run);
583        new_runs.extend(inner.runs.iter().cloned());
584
585        let names: Vec<String> = new_runs.iter().map(|r| r.file_name()).collect();
586        Manifest::store(&self.dir, self.next_seq.load(Ordering::SeqCst), &names)?;
587        inner.runs = new_runs;
588
589        // The flushed writes are now durable in the run, so the log that held
590        // them can be emptied; recovery replays only writes since this flush.
591        inner.durability.rotate()?;
592
593        if inner.runs.len() >= self.config.compaction_trigger_runs() {
594            self.signal_compaction();
595        }
596        Ok(())
597    }
598
599    /// Merge every current run into a single new run, then swap it in.
600    ///
601    /// The merge runs with no lock held; only the final swap and manifest write
602    /// take the write lock. Concurrency safety rests on the snapshot taken up
603    /// front and on reference-counted run files (see module docs).
604    fn compact_once(&self) -> Result<()> {
605        if self.compacting.swap(true, Ordering::AcqRel) {
606            return Ok(()); // another compaction is already running
607        }
608        let result = self.compact_inner();
609        self.compacting.store(false, Ordering::Release);
610        result
611    }
612
613    fn compact_inner(&self) -> Result<()> {
614        // Snapshot the runs to merge. Anything flushed after this stays newer
615        // than the output and is preserved by the swap.
616        let inputs: Vec<Arc<SsTable>> = {
617            let inner = self.read_guard();
618            if inner.runs.len() < 2 {
619                return Ok(());
620            }
621            inner.runs.clone()
622        };
623
624        // Size the output filter from the sum of input entry counts — an upper
625        // bound (dedup only lowers the real count), so the filter is never
626        // under-sized.
627        let capacity: usize = inputs
628            .iter()
629            .map(|r| usize::try_from(r.entry_count()).unwrap_or(usize::MAX))
630            .fold(0usize, |acc, n| acc.saturating_add(n));
631
632        // Merge into a new run with no lock held.
633        let seq = self.next_seq.fetch_add(1, Ordering::SeqCst);
634        let name = manifest::run_filename(seq);
635        let tmp = self.dir.join(format!("{name}.tmp"));
636        let final_path = self.dir.join(&name);
637        let mut filter = bloom::builder(capacity);
638        {
639            let mut writer = SsTableWriter::create(&tmp)?;
640            let cursors = inputs.iter().map(|r| r.cursor()).collect();
641            // Merging every run, so this output is the only level — tombstones
642            // have nothing left to mask and are dropped.
643            for item in Merge::new(Vec::new(), cursors) {
644                let (key, value) = item?;
645                writer.push(&key, &Record::Value(value))?;
646                filter.add(&key);
647            }
648            writer.finish()?;
649        }
650        fs::rename(&tmp, &final_path).map_err(|e| Error::io("install compacted run", e))?;
651
652        let filter = filter.finish();
653        if let Some(filter) = &filter {
654            filter.write_sidecar(&final_path)?;
655        }
656        let mut output = SsTable::open(&final_path)?;
657        output.attach_filter(filter);
658        output.attach_cache(Arc::clone(&self.cache));
659        let output = Arc::new(output);
660
661        // Swap: drop the inputs, keep any runs flushed during the merge, append
662        // the output as the oldest run.
663        {
664            let mut inner = self.write_guard();
665            let mut new_runs: Vec<Arc<SsTable>> = inner
666                .runs
667                .iter()
668                .filter(|r| !inputs.iter().any(|i| Arc::ptr_eq(i, r)))
669                .cloned()
670                .collect();
671            new_runs.push(Arc::clone(&output));
672
673            let names: Vec<String> = new_runs.iter().map(|r| r.file_name()).collect();
674            // The manifest rename is the commit point. If it fails the output is
675            // an orphan the next open reclaims, and the live set is unchanged.
676            Manifest::store(&self.dir, self.next_seq.load(Ordering::SeqCst), &names)?;
677            for input in &inputs {
678                input.mark_obsolete();
679            }
680            inner.runs = new_runs;
681        }
682        // Drop the snapshot; obsolete files are removed once no reader holds them.
683        drop(inputs);
684        Ok(())
685    }
686
687    /// Ask the background compactor to run.
688    fn signal_compaction(&self) {
689        let mut state = self.compaction.lock().unwrap_or_else(|p| p.into_inner());
690        state.pending = true;
691        self.cond.notify_all();
692    }
693
694    fn read_guard(&self) -> RwLockReadGuard<'_, Inner> {
695        self.inner.read().unwrap_or_else(|p| p.into_inner())
696    }
697
698    fn write_guard(&self) -> RwLockWriteGuard<'_, Inner> {
699        self.inner.write().unwrap_or_else(|p| p.into_inner())
700    }
701}
702
703/// The background compaction loop: wait for work, compact, repeat until shutdown.
704fn compactor_loop(engine: &Engine) {
705    loop {
706        {
707            let mut state = engine.compaction.lock().unwrap_or_else(|p| p.into_inner());
708            while !state.pending && !state.shutdown {
709                state = engine.cond.wait(state).unwrap_or_else(|p| p.into_inner());
710            }
711            if state.shutdown {
712                return;
713            }
714            state.pending = false;
715            state.running = true;
716        }
717
718        let result = engine.compact_once();
719
720        {
721            let mut state = engine.compaction.lock().unwrap_or_else(|p| p.into_inner());
722            state.running = false;
723            state.generation += 1;
724            if let Err(err) = result {
725                *engine.last_error.lock().unwrap_or_else(|p| p.into_inner()) = Some(err);
726            }
727            engine.cond.notify_all();
728        }
729    }
730}
731
732/// Where a key sits relative to a range.
733enum Pos {
734    Below,
735    In,
736    Above,
737}
738
739/// Classify `key` against `range`.
740fn position<R: RangeBounds<Vec<u8>>>(range: &R, key: &[u8]) -> Pos {
741    let below = match range.start_bound() {
742        Bound::Included(s) => key < s.as_slice(),
743        Bound::Excluded(s) => key <= s.as_slice(),
744        Bound::Unbounded => false,
745    };
746    if below {
747        return Pos::Below;
748    }
749    let above = match range.end_bound() {
750        Bound::Included(e) => key > e.as_slice(),
751        Bound::Excluded(e) => key >= e.as_slice(),
752        Bound::Unbounded => false,
753    };
754    if above { Pos::Above } else { Pos::In }
755}
756
757#[cfg(test)]
758#[allow(clippy::unwrap_used, clippy::expect_used)]
759mod tests {
760    use super::*;
761
762    /// Open with compaction effectively disabled, for deterministic tests.
763    fn db_no_autocompact() -> (tempfile::TempDir, Lsm) {
764        let dir = tempfile::tempdir().unwrap();
765        let db =
766            Lsm::open_with(dir.path(), LsmConfig::new().compaction_trigger(usize::MAX)).unwrap();
767        (dir, db)
768    }
769
770    fn db() -> (tempfile::TempDir, Lsm) {
771        let dir = tempfile::tempdir().unwrap();
772        let db = Lsm::open(dir.path()).unwrap();
773        (dir, db)
774    }
775
776    #[test]
777    fn test_put_get_roundtrip() {
778        let (_d, db) = db();
779        db.put(b"k", b"v").unwrap();
780        assert_eq!(db.get(b"k").unwrap(), Some(b"v".to_vec()));
781    }
782
783    #[test]
784    fn test_get_absent_is_none() {
785        let (_d, db) = db();
786        assert_eq!(db.get(b"absent").unwrap(), None);
787    }
788
789    #[test]
790    fn test_overwrite_across_runs() {
791        let (_d, db) = db_no_autocompact();
792        db.put(b"k", b"old").unwrap();
793        db.flush().unwrap();
794        db.put(b"k", b"new").unwrap();
795        db.flush().unwrap();
796        assert_eq!(db.run_count(), 2);
797        assert_eq!(db.get(b"k").unwrap(), Some(b"new".to_vec()));
798    }
799
800    #[test]
801    fn test_delete_masks_value_across_runs() {
802        let (_d, db) = db_no_autocompact();
803        db.put(b"k", b"v").unwrap();
804        db.flush().unwrap();
805        db.delete(b"k").unwrap();
806        db.flush().unwrap();
807        assert_eq!(db.get(b"k").unwrap(), None);
808    }
809
810    #[test]
811    fn test_compaction_merges_to_single_run() {
812        let (_d, db) = db_no_autocompact();
813        for i in 0..5u32 {
814            db.put(format!("k{i}").into_bytes(), format!("v{i}").into_bytes())
815                .unwrap();
816            db.flush().unwrap();
817        }
818        assert_eq!(db.run_count(), 5);
819        db.compact_now().unwrap();
820        assert_eq!(db.run_count(), 1);
821        for i in 0..5u32 {
822            assert_eq!(
823                db.get(format!("k{i}").into_bytes()).unwrap(),
824                Some(format!("v{i}").into_bytes())
825            );
826        }
827    }
828
829    #[test]
830    fn test_compaction_drops_tombstones_and_keeps_latest() {
831        let (_d, db) = db_no_autocompact();
832        db.put(b"keep", b"1").unwrap();
833        db.put(b"gone", b"x").unwrap();
834        db.flush().unwrap();
835        db.put(b"keep", b"2").unwrap(); // newer value
836        db.delete(b"gone").unwrap(); // tombstone
837        db.flush().unwrap();
838        db.compact_now().unwrap();
839
840        assert_eq!(db.run_count(), 1);
841        assert_eq!(db.get(b"keep").unwrap(), Some(b"2".to_vec()));
842        assert_eq!(db.get(b"gone").unwrap(), None);
843        // The compacted run holds exactly one live entry.
844        assert_eq!(db.scan(..).unwrap().count(), 1);
845    }
846
847    #[test]
848    fn test_reopen_reads_all_runs() {
849        let dir = tempfile::tempdir().unwrap();
850        {
851            let db = Lsm::open_with(dir.path(), LsmConfig::new().compaction_trigger(usize::MAX))
852                .unwrap();
853            db.put(b"a", b"1").unwrap();
854            db.flush().unwrap();
855            db.put(b"b", b"2").unwrap();
856            db.flush().unwrap();
857            db.put(b"a", b"updated").unwrap();
858            db.flush().unwrap();
859        }
860        let db = Lsm::open(dir.path()).unwrap();
861        assert_eq!(db.get(b"a").unwrap(), Some(b"updated".to_vec()));
862        assert_eq!(db.get(b"b").unwrap(), Some(b"2".to_vec()));
863    }
864
865    #[test]
866    fn test_reopen_after_compaction() {
867        let dir = tempfile::tempdir().unwrap();
868        {
869            let db = Lsm::open_with(dir.path(), LsmConfig::new().compaction_trigger(usize::MAX))
870                .unwrap();
871            for i in 0..4u32 {
872                db.put(format!("k{i}").into_bytes(), b"v").unwrap();
873                db.flush().unwrap();
874            }
875            db.compact_now().unwrap();
876            assert_eq!(db.run_count(), 1);
877        }
878        let db = Lsm::open(dir.path()).unwrap();
879        assert_eq!(db.run_count(), 1);
880        assert_eq!(db.scan(..).unwrap().count(), 4);
881    }
882
883    #[test]
884    fn test_background_compaction_triggers() {
885        let dir = tempfile::tempdir().unwrap();
886        let db = Lsm::open_with(dir.path(), LsmConfig::new().compaction_trigger(3)).unwrap();
887        for i in 0..10u32 {
888            db.put(format!("k{i:02}").into_bytes(), b"v").unwrap();
889            db.flush().unwrap();
890        }
891        db.wait_for_idle();
892        // Compaction should have collapsed the runs well below the flush count.
893        assert!(db.run_count() <= 3, "run count was {}", db.run_count());
894        for i in 0..10u32 {
895            assert_eq!(
896                db.get(format!("k{i:02}").into_bytes()).unwrap(),
897                Some(b"v".to_vec())
898            );
899        }
900    }
901
902    #[test]
903    fn test_scan_merges_across_runs() {
904        let (_d, db) = db_no_autocompact();
905        db.put(b"a", b"old-a").unwrap();
906        db.put(b"c", b"3").unwrap();
907        db.flush().unwrap();
908        db.put(b"a", b"new-a").unwrap();
909        db.put(b"b", b"2").unwrap();
910        db.delete(b"c").unwrap();
911        db.flush().unwrap();
912        let got: Vec<_> = db.scan(..).unwrap().collect();
913        assert_eq!(
914            got,
915            vec![
916                (b"a".to_vec(), b"new-a".to_vec()),
917                (b"b".to_vec(), b"2".to_vec())
918            ]
919        );
920    }
921
922    #[test]
923    fn test_scan_bounded_range() {
924        let (_d, db) = db();
925        for (k, v) in [("a", "1"), ("b", "2"), ("c", "3"), ("d", "4")] {
926            db.put(k.as_bytes(), v.as_bytes()).unwrap();
927        }
928        let got: Vec<_> = db.scan(b"b".to_vec()..b"d".to_vec()).unwrap().collect();
929        assert_eq!(
930            got,
931            vec![
932                (b"b".to_vec(), b"2".to_vec()),
933                (b"c".to_vec(), b"3".to_vec())
934            ]
935        );
936    }
937
938    #[test]
939    fn test_empty_value_roundtrips_through_flush() {
940        let (_d, db) = db_no_autocompact();
941        db.put(b"k", b"").unwrap();
942        db.flush().unwrap();
943        assert_eq!(db.get(b"k").unwrap(), Some(Vec::new()));
944        db.compact_now().unwrap();
945        assert_eq!(db.get(b"k").unwrap(), Some(Vec::new()));
946    }
947
948    #[test]
949    fn test_engine_is_send_and_sync() {
950        fn assert_send_sync<T: Send + Sync>() {}
951        assert_send_sync::<Lsm>();
952    }
953
954    /// The bloom-filter contract (`bloom` feature): a negative point lookup
955    /// reads no data blocks, because every run's filter rejects the absent key,
956    /// while a positive lookup still reads a block. This is the deterministic,
957    /// CI-enforced form of the 0.5 exit criterion.
958    #[cfg(feature = "bloom")]
959    #[test]
960    fn test_bloom_skips_blocks_on_negative_lookup() {
961        use crate::sstable::block_reads;
962
963        let (_d, db) = db_no_autocompact();
964        // Several runs, each covering an overlapping key range, so an absent key
965        // would otherwise force one candidate-block read per run.
966        for run in 0..6u32 {
967            for i in 0..50u32 {
968                let key = format!("k{:04}", i * 2); // even keys only
969                db.put(key.as_bytes(), format!("r{run}").as_bytes())
970                    .unwrap();
971            }
972            db.flush().unwrap();
973        }
974        assert_eq!(db.run_count(), 6);
975
976        // Negative lookup for an odd key that sorts *inside* every run's range.
977        block_reads::reset();
978        assert_eq!(db.get(b"k0051").unwrap(), None);
979        assert_eq!(
980            block_reads::count(),
981            0,
982            "bloom filters must let a negative lookup skip every run with no block read"
983        );
984
985        // A positive lookup does read a block (the counter is wired correctly).
986        block_reads::reset();
987        assert!(db.get(b"k0010").unwrap().is_some());
988        assert!(
989            block_reads::count() >= 1,
990            "a hit must read at least one block"
991        );
992    }
993
994    /// Compaction installs a sidecar for its output and removes the obsoleted
995    /// inputs' sidecars, leaving exactly one sidecar per live run.
996    #[cfg(feature = "bloom")]
997    #[test]
998    fn test_bloom_sidecars_track_runs_through_compaction() {
999        let count = |dir: &std::path::Path, suffix: &str| {
1000            std::fs::read_dir(dir)
1001                .unwrap()
1002                .filter(|e| {
1003                    e.as_ref()
1004                        .unwrap()
1005                        .file_name()
1006                        .to_string_lossy()
1007                        .ends_with(suffix)
1008                })
1009                .count()
1010        };
1011
1012        let (dir, db) = db_no_autocompact();
1013        for i in 0..5u32 {
1014            db.put(format!("k{i}").into_bytes(), b"v").unwrap();
1015            db.flush().unwrap();
1016        }
1017        assert_eq!(count(dir.path(), ".sst.bloom"), 5);
1018
1019        db.compact_now().unwrap();
1020        assert_eq!(db.run_count(), 1);
1021        // Exactly one run and one sidecar; the obsoleted inputs' sidecars are
1022        // gone (dropped alongside their runs).
1023        assert_eq!(count(dir.path(), ".sst"), 1);
1024        assert_eq!(count(dir.path(), ".sst.bloom"), 1);
1025        for i in 0..5u32 {
1026            assert_eq!(
1027                db.get(format!("k{i}").into_bytes()).unwrap(),
1028                Some(b"v".to_vec())
1029            );
1030        }
1031    }
1032
1033    /// With the block cache on (the default), a repeat lookup of the same key
1034    /// serves its block from cache and reads no data block.
1035    #[cfg(feature = "bloom")]
1036    #[test]
1037    fn test_block_cache_serves_repeat_lookup() {
1038        use crate::sstable::block_reads;
1039
1040        let (_d, db) = db(); // default config: 8 MiB block cache
1041        db.put(b"k", b"v").unwrap();
1042        db.flush().unwrap();
1043
1044        block_reads::reset();
1045        assert_eq!(db.get(b"k").unwrap(), Some(b"v".to_vec()));
1046        assert!(block_reads::count() >= 1, "cold lookup reads its block");
1047
1048        block_reads::reset();
1049        assert_eq!(db.get(b"k").unwrap(), Some(b"v".to_vec()));
1050        assert_eq!(
1051            block_reads::count(),
1052            0,
1053            "a repeat lookup must be served from the block cache"
1054        );
1055    }
1056
1057    /// With the block cache disabled, every lookup reads its block.
1058    #[cfg(feature = "bloom")]
1059    #[test]
1060    fn test_block_cache_disabled_always_reads() {
1061        use crate::sstable::block_reads;
1062
1063        let dir = tempfile::tempdir().unwrap();
1064        let db = Lsm::open_with(dir.path(), LsmConfig::new().block_cache_capacity(0)).unwrap();
1065        db.put(b"k", b"v").unwrap();
1066        db.flush().unwrap();
1067
1068        for _ in 0..2 {
1069            block_reads::reset();
1070            assert_eq!(db.get(b"k").unwrap(), Some(b"v".to_vec()));
1071            assert!(
1072                block_reads::count() >= 1,
1073                "with the cache off, every lookup reads its block"
1074            );
1075        }
1076    }
1077}