tiny_lsm/
lib.rs

1//! `tiny-lsm` is a dead-simple in-memory LSM for managing
2//! fixed-size metadata in more complex systems.
3//!
4//! Uses crc32fast to checksum all key-value pairs in the log and
5//! sstables. Uses zstd to compress all sstables. Performs sstable
6//! compaction in the background.
7//!
8//! Because the data is in-memory, there is no need to put bloom
9//! filters on the sstables, and read operations cannot fail due
10//! to IO issues.
11//!
12//! `Lsm` implements `Deref<Target=BTreeMap<[u8; K], [u8; V]>>`
13//! to immutably access the data directly without any IO or
14//! blocking.
15//!
16//! `Lsm::insert` writes all data into a 32-kb `BufWriter`
17//! in front of a log file, so it will block for very
18//! short periods of time here and there. SST compaction
19//! is handled completely in the background.
20//!
21//! This is a bad choice for large data sets if you
22//! require quick recovery time because it needs to read all of
23//! the sstables and the write ahead log when starting up.
24//!
25//! The benefit to using tiered sstables at all, despite being
26//! in-memory, is that they act as an effective log-deduplication
27//! mechanism, keeping space amplification very low.
28//!
29//! Maximum throughput is not the goal of this project. Low space
30//! amplification and very simple code is the goal, because this
31//! is intended to maintain metadata in more complex systems.
32//!
33//! There is currently no compaction throttling. You can play
34//! with the `Config` options around compaction to change compaction
35//! characteristics.
36//!
37//! Never change the constant size of keys or values for an existing
38//! database.
39//!
40//! # Examples
41//!
42//! ```
43//! // open up the LSM
44//! let mut lsm = tiny_lsm::Lsm::recover("path/to/base/dir").expect("recover lsm");
45//!
46//! // store some things
47//! let key: [u8; 8] = 8_u64.to_le_bytes();
48//! let value: [u8; 1] = 255_u8.to_le_bytes();
49//! lsm.insert(key, value);
50//!
51//! assert_eq!(lsm.get(&key), Some(&value));
52//!
53//! ```
54#![cfg_attr(test, feature(no_coverage))]
55
56use std::collections::BTreeMap;
57use std::fs;
58use std::io::{self, prelude::*, BufReader, BufWriter, Result};
59use std::path::{Path, PathBuf};
60use std::sync::{
61    atomic::{AtomicU64, Ordering},
62    mpsc, Arc,
63};
64
65const SSTABLE_DIR: &str = "sstables";
66const U64_SZ: usize = std::mem::size_of::<u64>();
67
68#[derive(Debug, Clone, Copy)]
69#[cfg_attr(
70    test,
71    derive(serde::Serialize, serde::Deserialize, fuzzcheck::DefaultMutator)
72)]
73pub struct Config {
74    /// If on-disk uncompressed sstable data exceeds in-memory usage
75    /// by this proportion, a full-compaction of all sstables will
76    /// occur. This is only likely to happen in situations where
77    /// multiple versions of most of the database's keys exist
78    /// in multiple sstables, but should never happen for workloads
79    /// where mostly new keys are being written.
80    pub max_space_amp: u8,
81    /// When the log file exceeds this size, a new compressed
82    /// and compacted sstable will be flushed to disk and the
83    /// log file will be truncated.
84    pub max_log_length: usize,
85    /// When the background compactor thread looks for contiguous
86    /// ranges of sstables to merge, it will require all sstables
87    /// to be at least 1/`merge_ratio` * the size of the first sstable
88    /// in the contiguous window under consideration.
89    pub merge_ratio: u8,
90    /// When the background compactor thread looks for ranges of
91    /// sstables to merge, it will require ranges to be at least
92    /// this long.
93    pub merge_window: u8,
94    /// All inserts go directly to a `BufWriter` wrapping the log
95    /// file. This option determines how large that in-memory buffer
96    /// is.
97    pub log_bufwriter_size: u32,
98    /// The level of compression to use for the sstables with zstd.
99    pub zstd_sstable_compression_level: u8,
100}
101
102impl Default for Config {
103    fn default() -> Config {
104        Config {
105            max_space_amp: 2,
106            max_log_length: 32 * 1024 * 1024,
107            merge_ratio: 3,
108            merge_window: 10,
109            log_bufwriter_size: 32 * 1024,
110            zstd_sstable_compression_level: 3,
111        }
112    }
113}
114
115struct WorkerStats {
116    read_bytes: AtomicU64,
117    written_bytes: AtomicU64,
118}
119
120#[derive(Debug, Clone, Copy)]
121pub struct Stats {
122    pub resident_bytes: u64,
123    pub on_disk_bytes: u64,
124    pub logged_bytes: u64,
125    pub written_bytes: u64,
126    pub read_bytes: u64,
127    pub space_amp: f64,
128    pub write_amp: f64,
129}
130
131fn hash<const K: usize, const V: usize>(k: &[u8; K], v: &Option<[u8; V]>) -> u32 {
132    let mut hasher = crc32fast::Hasher::new();
133    hasher.update(&[v.is_some() as u8]);
134    hasher.update(&*k);
135
136    if let Some(v) = v {
137        hasher.update(v);
138    } else {
139        hasher.update(&[0; V]);
140    }
141
142    // we XOR the hash to make sure it's something other than 0 when empty,
143    // because 0 is an easy value to create accidentally or via corruption.
144    hasher.finalize() ^ 0xFF
145}
146
147#[inline]
148fn hash_batch_len(len: usize) -> u32 {
149    let mut hasher = crc32fast::Hasher::new();
150    hasher.update(&(len as u64).to_le_bytes());
151
152    hasher.finalize() ^ 0xFF
153}
154
155enum WorkerMessage {
156    NewSST { id: u64, sst_sz: u64, db_sz: u64 },
157    Stop(mpsc::Sender<()>),
158    Heartbeat(mpsc::Sender<()>),
159}
160
161struct Worker<const K: usize, const V: usize> {
162    sstable_directory: BTreeMap<u64, u64>,
163    inbox: mpsc::Receiver<WorkerMessage>,
164    db_sz: u64,
165    path: PathBuf,
166    config: Config,
167    stats: Arc<WorkerStats>,
168}
169
170impl<const K: usize, const V: usize> Worker<K, V> {
171    #[cfg(not(test))]
172    fn run(mut self) {
173        while self.tick() {}
174        log::info!("tiny-lsm compaction worker quitting");
175    }
176
177    fn tick(&mut self) -> bool {
178        match self.inbox.recv() {
179            Ok(message) => {
180                if !self.handle_message(message) {
181                    return false;
182                }
183            }
184            Err(mpsc::RecvError) => {
185                return false;
186            }
187        }
188
189        // only compact one run at a time before checking
190        // for new messages.
191        if let Err(e) = self.sstable_maintenance() {
192            log::error!(
193                "error while compacting sstables \
194                in the background: {:?}",
195                e
196            );
197        }
198
199        true
200    }
201
202    fn handle_message(&mut self, message: WorkerMessage) -> bool {
203        match message {
204            WorkerMessage::NewSST { id, sst_sz, db_sz } => {
205                self.db_sz = db_sz;
206                self.sstable_directory.insert(id, sst_sz);
207                true
208            }
209            WorkerMessage::Stop(dropper) => {
210                drop(dropper);
211                false
212            }
213            WorkerMessage::Heartbeat(dropper) => {
214                drop(dropper);
215                true
216            }
217        }
218    }
219
220    fn sstable_maintenance(&mut self) -> Result<()> {
221        let on_disk_size: u64 = self.sstable_directory.values().sum();
222
223        log::debug!("disk size: {} mem size: {}", on_disk_size, self.db_sz);
224        if self.sstable_directory.len() > 1
225            && on_disk_size / (self.db_sz + 1) > self.config.max_space_amp as u64
226        {
227            log::debug!(
228                "performing full compaction, decompressed on-disk \
229                database size has grown beyond {}x the in-memory size",
230                self.config.max_space_amp
231            );
232            let run_to_compact: Vec<u64> = self.sstable_directory.keys().copied().collect();
233
234            self.compact_sstable_run(&run_to_compact)?;
235            return Ok(());
236        }
237
238        if self.sstable_directory.len() < self.config.merge_window.max(2) as usize {
239            return Ok(());
240        }
241
242        for window in self
243            .sstable_directory
244            .iter()
245            .collect::<Vec<_>>()
246            .windows(self.config.merge_window.max(2) as usize)
247        {
248            if window
249                .iter()
250                .skip(1)
251                .all(|w| *w.1 * self.config.merge_ratio as u64 > *window[0].1)
252            {
253                let run_to_compact: Vec<u64> = window.into_iter().map(|(id, _sum)| **id).collect();
254
255                self.compact_sstable_run(&run_to_compact)?;
256                return Ok(());
257            }
258        }
259
260        Ok(())
261    }
262
263    // This function must be able to crash at any point without
264    // leaving the system in an unrecoverable state, or without
265    // losing data. This function must be nullipotent from the
266    // external API surface's perspective.
267    fn compact_sstable_run(&mut self, sstable_ids: &[u64]) -> Result<()> {
268        log::debug!(
269            "trying to compact sstable_ids {:?}",
270            sstable_ids
271                .iter()
272                .map(|id| id_format(*id))
273                .collect::<Vec<_>>()
274        );
275
276        let mut map = BTreeMap::new();
277
278        let mut read_pairs = 0;
279
280        for sstable_id in sstable_ids {
281            for (k, v) in read_sstable::<K, V>(&self.path, *sstable_id)? {
282                map.insert(k, v);
283                read_pairs += 1;
284            }
285        }
286
287        self.stats
288            .read_bytes
289            .fetch_add(read_pairs * (4 + 1 + K + V) as u64, Ordering::Relaxed);
290
291        let sst_id = sstable_ids
292            .iter()
293            .max()
294            .expect("compact_sstable_run called with empty set of sst ids");
295
296        write_sstable(&self.path, *sst_id, &map, true, &self.config)?;
297
298        self.stats
299            .written_bytes
300            .fetch_add(map.len() as u64 * (4 + 1 + K + V) as u64, Ordering::Relaxed);
301
302        let sst_sz = map.len() as u64 * (4 + K + V) as u64;
303        self.sstable_directory.insert(*sst_id, sst_sz);
304
305        log::debug!("compacted range into sstable {}", id_format(*sst_id));
306
307        for sstable_id in sstable_ids {
308            if sstable_id == sst_id {
309                continue;
310            }
311            fs::remove_file(self.path.join(SSTABLE_DIR).join(id_format(*sstable_id)))?;
312            self.sstable_directory
313                .remove(sstable_id)
314                .expect("compacted sst not present in sstable_directory");
315        }
316        fs::File::open(self.path.join(SSTABLE_DIR))?.sync_all()?;
317
318        Ok(())
319    }
320}
321
322fn id_format(id: u64) -> String {
323    format!("{:016x}", id)
324}
325
326fn list_sstables(path: &Path, remove_tmp: bool) -> Result<BTreeMap<u64, u64>> {
327    let mut sstable_map = BTreeMap::new();
328
329    for dir_entry_res in fs::read_dir(path.join(SSTABLE_DIR))? {
330        let dir_entry = dir_entry_res?;
331        let file_name = if let Ok(f) = dir_entry.file_name().into_string() {
332            f
333        } else {
334            continue;
335        };
336
337        if let Ok(id) = u64::from_str_radix(&file_name, 16) {
338            let metadata = dir_entry.metadata()?;
339
340            sstable_map.insert(id, metadata.len());
341        } else {
342            if remove_tmp && file_name.ends_with("-tmp") {
343                log::warn!("removing incomplete sstable rewrite {}", file_name);
344                fs::remove_file(path.join(SSTABLE_DIR).join(file_name))?;
345            }
346        }
347    }
348
349    Ok(sstable_map)
350}
351
352fn write_sstable<const K: usize, const V: usize>(
353    path: &Path,
354    id: u64,
355    items: &BTreeMap<[u8; K], Option<[u8; V]>>,
356    tmp_mv: bool,
357    config: &Config,
358) -> Result<()> {
359    let sst_dir_path = path.join(SSTABLE_DIR);
360    let sst_path = if tmp_mv {
361        sst_dir_path.join(format!("{:x}-tmp", id))
362    } else {
363        sst_dir_path.join(id_format(id))
364    };
365
366    let file = fs::OpenOptions::new()
367        .create(true)
368        .write(true)
369        .open(&sst_path)?;
370
371    let max_zstd_level = zstd::compression_level_range();
372    let zstd_level = config
373        .zstd_sstable_compression_level
374        .min(*max_zstd_level.end() as u8);
375
376    let mut bw =
377        BufWriter::new(zstd::Encoder::new(file, zstd_level as _).expect("zstd encoder failure"));
378
379    bw.write_all(&(items.len() as u64).to_le_bytes())?;
380
381    for (k, v) in items {
382        let crc: u32 = hash(k, v);
383        bw.write_all(&crc.to_le_bytes())?;
384        bw.write_all(&[v.is_some() as u8])?;
385        bw.write_all(k)?;
386
387        if let Some(v) = v {
388            bw.write_all(v)?;
389        } else {
390            bw.write_all(&[0; V])?;
391        }
392    }
393
394    bw.flush()?;
395
396    bw.get_mut().get_mut().sync_all()?;
397    fs::File::open(path.join(SSTABLE_DIR))?.sync_all()?;
398
399    if tmp_mv {
400        let new_path = sst_dir_path.join(id_format(id));
401        fs::rename(sst_path, new_path)?;
402    }
403
404    Ok(())
405}
406
407fn read_sstable<const K: usize, const V: usize>(
408    path: &Path,
409    id: u64,
410) -> Result<Vec<([u8; K], Option<[u8; V]>)>> {
411    let file = fs::OpenOptions::new()
412        .read(true)
413        .open(path.join(SSTABLE_DIR).join(id_format(id)))?;
414
415    let mut reader = zstd::Decoder::new(BufReader::with_capacity(16 * 1024 * 1024, file)).unwrap();
416
417    // crc + tombstone discriminant + key + value
418    let mut buf = vec![0; 4 + 1 + K + V];
419
420    let len_buf = &mut [0; 8];
421
422    reader.read_exact(len_buf)?;
423
424    let expected_len: u64 = u64::from_le_bytes(*len_buf);
425    let mut sstable = Vec::with_capacity(expected_len as usize);
426
427    while let Ok(()) = reader.read_exact(&mut buf) {
428        let crc_expected: u32 = u32::from_le_bytes(buf[0..4].try_into().unwrap());
429        let d: bool = match buf[4] {
430            0 => false,
431            1 => true,
432            _ => {
433                log::warn!("detected torn-write while reading sstable {:016x}", id);
434                break;
435            }
436        };
437        let k: [u8; K] = buf[5..K + 5].try_into().unwrap();
438        let v: Option<[u8; V]> = if d {
439            Some(buf[K + 5..5 + K + V].try_into().unwrap())
440        } else {
441            None
442        };
443        let crc_actual: u32 = hash(&k, &v);
444
445        if crc_expected != crc_actual {
446            log::warn!("detected torn-write while reading sstable {:016x}", id);
447            break;
448        }
449
450        sstable.push((k, v));
451    }
452
453    if sstable.len() as u64 != expected_len {
454        log::warn!(
455            "sstable {:016x} tear detected - process probably crashed \
456            before full sstable could be written out",
457            id
458        );
459    }
460
461    Ok(sstable)
462}
463
464pub struct Lsm<const K: usize, const V: usize> {
465    // `BufWriter` flushes on drop
466    memtable: BTreeMap<[u8; K], Option<[u8; V]>>,
467    db: BTreeMap<[u8; K], [u8; V]>,
468    worker_outbox: mpsc::Sender<WorkerMessage>,
469    next_sstable_id: u64,
470    dirty_bytes: usize,
471    #[cfg(test)]
472    worker: Worker<K, V>,
473    #[cfg(test)]
474    pub log: tearable::Tearable<fs::File>,
475    #[cfg(not(test))]
476    log: BufWriter<fs::File>,
477    path: PathBuf,
478    config: Config,
479    stats: Stats,
480    worker_stats: Arc<WorkerStats>,
481}
482
483impl<const K: usize, const V: usize> Drop for Lsm<K, V> {
484    fn drop(&mut self) {
485        let (tx, rx) = mpsc::channel();
486
487        if self.worker_outbox.send(WorkerMessage::Stop(tx)).is_err() {
488            log::error!("failed to shut down compaction worker on Lsm drop");
489            return;
490        }
491
492        #[cfg(test)]
493        assert!(!self.worker.tick());
494
495        for _ in rx {}
496    }
497}
498
499impl<const K: usize, const V: usize> std::ops::Deref for Lsm<K, V> {
500    type Target = BTreeMap<[u8; K], [u8; V]>;
501
502    fn deref(&self) -> &Self::Target {
503        &self.db
504    }
505}
506
507impl<const K: usize, const V: usize> Lsm<K, V> {
508    /// Recover the LSM off disk. Make sure to never
509    /// recover a DB using different K, V parameters than
510    /// it was created with, or there may be data loss.
511    ///
512    /// This is an O(N) operation and involves reading
513    /// all previously written sstables and the log,
514    /// to recover all data into an in-memory `BTreeMap`.
515    pub fn recover<P: AsRef<Path>>(p: P) -> Result<Lsm<K, V>> {
516        Lsm::recover_with_config(p, Config::default())
517    }
518
519    /// Recover the LSM, and provide custom options
520    /// around IO and merging. All values in the `Config`
521    /// object are safe to change across restarts, unlike
522    /// the fixed K and V lengths for data in the database.
523    pub fn recover_with_config<P: AsRef<Path>>(p: P, config: Config) -> Result<Lsm<K, V>> {
524        let path = p.as_ref();
525        if !path.exists() {
526            fs::create_dir_all(path)?;
527            fs::create_dir(path.join(SSTABLE_DIR))?;
528            fs::File::open(path.join(SSTABLE_DIR))?.sync_all()?;
529            fs::File::open(path)?.sync_all()?;
530            let mut parent_opt = path.parent();
531
532            // need to recursively fsync parents since
533            // we used create_dir_all
534            while let Some(parent) = parent_opt {
535                if parent.file_name().is_none() {
536                    break;
537                }
538                if fs::File::open(parent).and_then(|f| f.sync_all()).is_err() {
539                    // we made a reasonable attempt, but permissions
540                    // can sometimes get in the way, and at this point it's
541                    // becoming pedantic.
542                    break;
543                }
544                parent_opt = parent.parent();
545            }
546        }
547
548        let sstable_directory = list_sstables(path, true)?;
549
550        let mut db = BTreeMap::new();
551        for sstable_id in sstable_directory.keys() {
552            for (k, v) in read_sstable::<K, V>(path, *sstable_id)? {
553                if let Some(v) = v {
554                    db.insert(k, v);
555                } else {
556                    db.remove(&k);
557                }
558            }
559        }
560
561        let max_sstable_id = sstable_directory.keys().next_back().copied();
562
563        let log = fs::OpenOptions::new()
564            .create(true)
565            .read(true)
566            .write(true)
567            .open(path.join("log"))?;
568
569        let mut reader = BufReader::new(log);
570
571        let tuple_sz = U64_SZ.max(K + V);
572        let header_sz = 5;
573        let header_tuple_sz = header_sz + tuple_sz;
574        let mut buf = vec![0; header_tuple_sz];
575
576        let mut memtable = BTreeMap::new();
577        let mut recovered = 0;
578
579        // write_batch is the pending memtable updates, the number
580        // of remaining items in the write batch, and the number of
581        // bytes that have been recovered in the write batch.
582        let mut write_batch: Option<(_, usize, u64)> = None;
583        while let Ok(()) = reader.read_exact(&mut buf) {
584            let crc_expected: u32 = u32::from_le_bytes(buf[0..4].try_into().unwrap());
585            let d: bool = match buf[4] {
586                0 => false,
587                1 => true,
588                2 if write_batch.is_none() => {
589                    // begin batch
590                    let batch_sz_buf: [u8; 8] = buf[5..5 + 8].try_into().unwrap();
591                    let batch_sz: u64 = u64::from_le_bytes(batch_sz_buf);
592                    log::debug!("processing batch of len {}", batch_sz);
593
594                    let crc_actual = hash_batch_len(usize::try_from(batch_sz).unwrap());
595                    if crc_expected != crc_actual {
596                        log::warn!("crc mismatch for batch size marker");
597                        break;
598                    }
599
600                    if !buf[5 + U64_SZ..].iter().all(|e| *e == 0) {
601                        log::warn!(
602                            "expected all pad bytes after logged \
603                            batch manifests to be zero, but some \
604                            corruption was detected"
605                        );
606                        break;
607                    }
608
609                    if batch_sz > usize::MAX as u64 {
610                        return Err(io::Error::new(
611                            io::ErrorKind::InvalidInput,
612                            "recovering a batch size over usize::MAX is not supported",
613                        ));
614                    }
615
616                    let wb_remaining = batch_sz as usize;
617                    let wb_recovered = buf.len() as u64;
618
619                    if wb_remaining > 0 {
620                        write_batch = Some((
621                            Vec::with_capacity(batch_sz as usize),
622                            wb_remaining,
623                            wb_recovered,
624                        ));
625                    } else {
626                        recovered += buf.len() as u64;
627                    }
628
629                    continue;
630                }
631                _ => {
632                    log::warn!("invalid log message discriminant detected: {}", buf[4]);
633                    break;
634                }
635            };
636            let k: [u8; K] = buf[5..5 + K].try_into().unwrap();
637            let v: Option<[u8; V]> = if d {
638                Some(buf[5 + K..5 + K + V].try_into().unwrap())
639            } else {
640                None
641            };
642
643            let crc_actual: u32 = hash(&k, &v);
644
645            if crc_expected != crc_actual {
646                log::warn!(
647                    "crc mismatch for kv pair {:?}-{:?}: expected {} actual {}, torn log detected",
648                    k,
649                    v,
650                    crc_expected,
651                    crc_actual
652                );
653                break;
654            }
655
656            let pad_start = if v.is_some() { 5 + K + V } else { 5 + K };
657
658            if !buf[pad_start..].iter().all(|e| *e == 0) {
659                log::warn!(
660                    "expected all pad bytes for logged kv entries \
661                    to be zero, but some corruption was detected"
662                );
663                break;
664            }
665
666            if let Some((mut wb, mut wb_remaining, mut wb_recovered)) = write_batch.take() {
667                wb.push((k, v));
668                wb_remaining = wb_remaining.checked_sub(1).unwrap();
669                wb_recovered = wb_recovered.checked_add(buf.len() as u64).unwrap();
670
671                // apply the write batch all at once
672                // or never at all
673                if wb_remaining == 0 {
674                    for (k, v) in wb {
675                        memtable.insert(k, v);
676
677                        if let Some(v) = v {
678                            db.insert(k, v);
679                        } else {
680                            db.remove(&k);
681                        }
682                    }
683                    recovered += wb_recovered;
684                } else {
685                    write_batch = Some((wb, wb_remaining, wb_recovered));
686                }
687            } else {
688                memtable.insert(k, v);
689
690                if let Some(v) = v {
691                    db.insert(k, v);
692                } else {
693                    db.remove(&k);
694                }
695
696                recovered += buf.len() as u64;
697            }
698        }
699
700        // need to back up a few bytes to chop off the torn log
701        log::debug!("recovered {} kv pairs", db.len());
702        log::debug!("rewinding log down to length {}", recovered);
703        let log_file = reader.get_mut();
704        log_file.seek(io::SeekFrom::Start(recovered))?;
705        log_file.set_len(recovered)?;
706        log_file.sync_all()?;
707        fs::File::open(path.join(SSTABLE_DIR))?.sync_all()?;
708
709        let (tx, rx) = mpsc::channel();
710
711        let worker_stats = Arc::new(WorkerStats {
712            read_bytes: 0.into(),
713            written_bytes: 0.into(),
714        });
715
716        let worker: Worker<K, V> = Worker {
717            path: path.clone().into(),
718            sstable_directory,
719            inbox: rx,
720            db_sz: db.len() as u64 * (K + V) as u64,
721            config,
722            stats: worker_stats.clone(),
723        };
724
725        #[cfg(not(test))]
726        std::thread::spawn(move || worker.run());
727
728        let (hb_tx, hb_rx) = mpsc::channel();
729        tx.send(WorkerMessage::Heartbeat(hb_tx)).unwrap();
730
731        #[cfg(test)]
732        let mut worker = worker;
733
734        #[cfg(test)]
735        assert!(worker.tick());
736
737        for _ in hb_rx {}
738
739        let lsm = Lsm {
740            #[cfg(not(test))]
741            log: BufWriter::with_capacity(config.log_bufwriter_size as usize, reader.into_inner()),
742            #[cfg(test)]
743            log: tearable::Tearable::new(reader.into_inner()),
744            #[cfg(test)]
745            worker,
746            path: path.into(),
747            next_sstable_id: max_sstable_id.unwrap_or(0) + 1,
748            dirty_bytes: recovered as usize,
749            worker_outbox: tx,
750            config,
751            stats: Stats {
752                logged_bytes: recovered,
753                on_disk_bytes: 0,
754                read_bytes: 0,
755                written_bytes: 0,
756                resident_bytes: db.len() as u64 * (K + V) as u64,
757                space_amp: 0.,
758                write_amp: 0.,
759            },
760            worker_stats,
761            db,
762            memtable,
763        };
764
765        Ok(lsm)
766    }
767
768    /// Writes a KV pair into the `Lsm`, returning the
769    /// previous value if it existed. This operation might
770    /// involve blocking for a very brief moment as a 32kb
771    /// `BufWriter` wrapping the log file is flushed.
772    ///
773    /// If you require blocking until all written data is
774    /// durable, use the `Lsm::flush` method below.
775    pub fn insert(&mut self, k: [u8; K], v: [u8; V]) -> Result<Option<[u8; V]>> {
776        self.log_mutation(k, Some(v))?;
777
778        if self.dirty_bytes > self.config.max_log_length {
779            self.flush()?;
780        }
781
782        Ok(self.db.insert(k, v))
783    }
784
785    /// Removes a KV pair from the `Lsm`, returning the
786    /// previous value if it existed. This operation might
787    /// involve blocking for a very brief moment as a 32kb
788    /// `BufWriter` wrapping the log file is flushed.
789    ///
790    /// If you require blocking until all written data is
791    /// durable, use the `Lsm::flush` method below.
792    pub fn remove(&mut self, k: &[u8; K]) -> Result<Option<[u8; V]>> {
793        self.log_mutation(*k, None)?;
794
795        if self.dirty_bytes > self.config.max_log_length {
796            self.flush()?;
797        }
798
799        Ok(self.db.remove(k))
800    }
801
802    /// Apply a set of updates to the `Lsm` and
803    /// log them to disk in a way that will
804    /// be recovered only if every update is
805    /// present.
806    pub fn write_batch(&mut self, write_batch: &[([u8; K], Option<[u8; V]>)]) -> Result<()> {
807        let batch_len: [u8; 8] = (write_batch.len() as u64).to_le_bytes();
808        let crc = hash_batch_len(write_batch.len());
809
810        self.log.write_all(&crc.to_le_bytes())?;
811        self.log.write_all(&[2_u8])?;
812        self.log.write_all(&batch_len)?;
813
814        // the zero pad is necessary because every log
815        // entry must have the same length, whether
816        // it's a batch size or actual kv tuple.
817        let tuple_sz = U64_SZ.max(K + V);
818        let pad_sz = tuple_sz - U64_SZ;
819        let pad = [0; U64_SZ];
820        self.log.write_all(&pad[..pad_sz])?;
821
822        for (k, v_opt) in write_batch {
823            if let Some(v) = v_opt {
824                self.db.insert(*k, *v);
825            } else {
826                self.db.remove(k);
827            }
828
829            self.log_mutation(*k, *v_opt)?;
830            self.memtable.insert(*k, *v_opt);
831        }
832
833        if self.dirty_bytes > self.config.max_log_length {
834            self.flush()?;
835        }
836
837        Ok(())
838    }
839
840    fn log_mutation(&mut self, k: [u8; K], v: Option<[u8; V]>) -> Result<()> {
841        let crc: u32 = hash(&k, &v);
842        self.log.write_all(&crc.to_le_bytes())?;
843        self.log.write_all(&[v.is_some() as u8])?;
844        self.log.write_all(&k)?;
845
846        if let Some(v) = v {
847            self.log.write_all(&v)?;
848        } else {
849            self.log.write_all(&[0; V])?;
850        };
851
852        // the zero pad is necessary because every log
853        // entry must have the same length, whether
854        // it's a batch size or actual kv tuple.
855        let min_tuple_sz = U64_SZ.max(K + V);
856        let pad_sz = min_tuple_sz - (K + V);
857        let pad = [0; U64_SZ];
858        self.log.write_all(&pad[..pad_sz])?;
859
860        let logged_bytes = 4 + 1 + min_tuple_sz;
861
862        self.memtable.insert(k, v);
863
864        self.dirty_bytes += logged_bytes;
865        self.stats.logged_bytes += logged_bytes as u64;
866        self.stats.written_bytes += logged_bytes as u64;
867
868        Ok(())
869    }
870
871    /// Blocks until all log data has been
872    /// written out to disk and fsynced. If
873    /// the log file has grown above a certain
874    /// threshold, it will be compacted into
875    /// a new sstable and the log file will
876    /// be truncated after the sstable has
877    /// been written, fsynced, and the sstable
878    /// directory has been fsyced.
879    pub fn flush(&mut self) -> Result<()> {
880        #[cfg(test)]
881        {
882            if self.log.tearing {
883                return Ok(());
884            }
885        }
886
887        self.log.flush()?;
888        self.log.get_mut().sync_all()?;
889
890        if self.dirty_bytes > self.config.max_log_length {
891            log::debug!("compacting log to sstable");
892            let memtable = std::mem::take(&mut self.memtable);
893            let sst_id = self.next_sstable_id;
894            if let Err(e) = write_sstable(&self.path, sst_id, &memtable, false, &self.config) {
895                // put memtable back together before returning
896                self.memtable = memtable;
897                log::error!("failed to flush lsm log to sstable: {:?}", e);
898                return Err(e.into());
899            }
900
901            let sst_sz = 8 + (memtable.len() as u64 * (4 + K + V) as u64);
902            let db_sz = self.db.len() as u64 * (K + V) as u64;
903
904            if let Err(e) = self.worker_outbox.send(WorkerMessage::NewSST {
905                id: sst_id,
906                sst_sz,
907                db_sz,
908            }) {
909                log::error!("failed to send message to worker: {:?}", e);
910                log::logger().flush();
911                panic!("failed to send message to worker: {:?}", e);
912            }
913
914            #[cfg(test)]
915            assert!(self.worker.tick());
916
917            self.next_sstable_id += 1;
918
919            let log_file: &mut fs::File = self.log.get_mut();
920            log_file.seek(io::SeekFrom::Start(0))?;
921            log_file.set_len(0)?;
922            log_file.sync_all()?;
923            fs::File::open(self.path.join(SSTABLE_DIR))?.sync_all()?;
924
925            self.dirty_bytes = 0;
926        }
927
928        Ok(())
929    }
930
931    pub fn stats(&mut self) -> Result<Stats> {
932        self.stats.written_bytes += self.worker_stats.written_bytes.swap(0, Ordering::Relaxed);
933        self.stats.read_bytes += self.worker_stats.read_bytes.swap(0, Ordering::Relaxed);
934        self.stats.resident_bytes = self.db.len() as u64 * (K + V) as u64;
935
936        let mut on_disk_bytes: u64 = std::fs::metadata(self.path.join("log"))?.len();
937
938        on_disk_bytes += list_sstables(&self.path, false)?
939            .into_iter()
940            .map(|(_, len)| len)
941            .sum::<u64>();
942
943        self.stats.on_disk_bytes = on_disk_bytes;
944
945        self.stats.write_amp =
946            self.stats.written_bytes as f64 / self.stats.on_disk_bytes.max(1) as f64;
947        self.stats.space_amp =
948            self.stats.on_disk_bytes as f64 / self.stats.resident_bytes.max(1) as f64;
949        Ok(self.stats)
950    }
951}
952
953#[cfg(test)]
954mod tearable;
955
956#[cfg(test)]
957mod fuzz;