lsm_tree/
abstract.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use crate::{
6    blob_tree::FragmentationMap,
7    compaction::CompactionStrategy,
8    config::TreeType,
9    iter_guard::IterGuardImpl,
10    table::Table,
11    tree::inner::MemtableId,
12    version::{SuperVersions, Version},
13    vlog::BlobFile,
14    AnyTree, BlobTree, Config, Guard, InternalValue, KvPair, Memtable, SeqNo,
15    SequenceNumberCounter, TableId, Tree, TreeId, UserKey, UserValue,
16};
17use enum_dispatch::enum_dispatch;
18use std::{
19    ops::RangeBounds,
20    sync::{Arc, MutexGuard, RwLockWriteGuard},
21};
22
23pub type RangeItem = crate::Result<KvPair>;
24
25/// Generic Tree API
26#[enum_dispatch]
27pub trait AbstractTree {
28    #[doc(hidden)]
29    fn next_table_id(&self) -> TableId;
30
31    #[doc(hidden)]
32    fn id(&self) -> TreeId;
33
34    #[doc(hidden)]
35    fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>>;
36
37    #[doc(hidden)]
38    fn current_version(&self) -> Version;
39
40    #[doc(hidden)]
41    fn get_version_history_lock(&self) -> RwLockWriteGuard<'_, SuperVersions>;
42
43    /// Seals the active memtable and flushes to table(s).
44    ///
45    /// If there are already other sealed memtables lined up, those will be flushed as well.
46    ///
47    /// Only used in tests.
48    #[doc(hidden)]
49    fn flush_active_memtable(&self, eviction_seqno: SeqNo) -> crate::Result<()> {
50        let lock = self.get_flush_lock();
51        self.rotate_memtable();
52        self.flush(&lock, eviction_seqno)?;
53        Ok(())
54    }
55
56    /// Synchronously flushes pending sealed memtables to tables.
57    ///
58    /// Returns the sum of flushed memtable sizes that were flushed.
59    ///
60    /// The function may not return a result, if nothing was flushed.
61    ///
62    /// # Errors
63    ///
64    /// Will return `Err` if an IO error occurs.
65    fn flush(
66        &self,
67        _lock: &MutexGuard<'_, ()>,
68        seqno_threshold: SeqNo,
69    ) -> crate::Result<Option<u64>> {
70        use crate::{compaction::stream::CompactionStream, merge::Merger};
71
72        let version_history = self.get_version_history_lock();
73        let latest = version_history.latest_version();
74
75        if latest.sealed_memtables.len() == 0 {
76            return Ok(None);
77        }
78
79        let sealed_ids = latest
80            .sealed_memtables
81            .iter()
82            .map(|mt| mt.0)
83            .collect::<Vec<_>>();
84
85        let flushed_size = latest.sealed_memtables.iter().map(|(_, x)| x.size()).sum();
86
87        let merger = Merger::new(
88            latest
89                .sealed_memtables
90                .iter()
91                .map(|(_, mt)| mt.iter().map(Ok))
92                .collect::<Vec<_>>(),
93        );
94        let stream = CompactionStream::new(merger, seqno_threshold);
95
96        drop(version_history);
97
98        if let Some((tables, blob_files)) = self.flush_to_tables(stream)? {
99            self.register_tables(
100                &tables,
101                blob_files.as_deref(),
102                None,
103                &sealed_ids,
104                seqno_threshold,
105            )?;
106        }
107
108        Ok(Some(flushed_size))
109    }
110
111    /// Returns an iterator that scans through the entire tree.
112    ///
113    /// Avoid using this function, or limit it as otherwise it may scan a lot of items.
114    fn iter(
115        &self,
116        seqno: SeqNo,
117        index: Option<Arc<Memtable>>,
118    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
119        self.range::<&[u8], _>(.., seqno, index)
120    }
121
122    /// Returns an iterator over a prefixed set of items.
123    ///
124    /// Avoid using an empty prefix as it may scan a lot of items (unless limited).
125    fn prefix<K: AsRef<[u8]>>(
126        &self,
127        prefix: K,
128        seqno: SeqNo,
129        index: Option<Arc<Memtable>>,
130    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static>;
131
132    /// Returns an iterator over a range of items.
133    ///
134    /// Avoid using full or unbounded ranges as they may scan a lot of items (unless limited).
135    fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
136        &self,
137        range: R,
138        seqno: SeqNo,
139        index: Option<Arc<Memtable>>,
140    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static>;
141
142    /// Ingests a sorted stream of key-value pairs into the tree.
143    ///
144    /// Can only be called on a new fresh, empty tree.
145    ///
146    /// # Errors
147    ///
148    /// Will return `Err` if an IO error occurs.
149    ///
150    /// # Panics
151    ///
152    /// Panics if the tree is **not** initially empty.
153    ///
154    /// Will panic if the input iterator is not sorted in ascending order.
155    #[doc(hidden)]
156    fn ingest(
157        &self,
158        iter: impl Iterator<Item = (UserKey, UserValue)>,
159        seqno_generator: &SequenceNumberCounter,
160        visible_seqno: &SequenceNumberCounter,
161    ) -> crate::Result<()>;
162
163    /// Returns the approximate number of tombstones in the tree.
164    fn tombstone_count(&self) -> u64;
165
166    /// Returns the approximate number of weak tombstones (single deletes) in the tree.
167    fn weak_tombstone_count(&self) -> u64;
168
169    /// Returns the approximate number of values reclaimable once weak tombstones can be GC'd.
170    fn weak_tombstone_reclaimable_count(&self) -> u64;
171
172    // TODO: clear() with Nuke compaction strategy (write lock) -> drop_range(..)
173
174    /// Drops tables that are fully contained in a given range.
175    ///
176    /// Accepts any `RangeBounds`, including unbounded or exclusive endpoints.
177    /// If the normalized lower bound is greater than the upper bound, the
178    /// method returns without performing any work.
179    ///
180    /// # Errors
181    ///
182    /// Will return `Err` only if an IO error occurs during compaction.
183    fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()>;
184
185    /// Performs major compaction, blocking the caller until it's done.
186    ///
187    /// # Errors
188    ///
189    /// Will return `Err` if an IO error occurs.
190    fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()>;
191
192    /// Returns the disk space used by stale blobs.
193    fn stale_blob_bytes(&self) -> u64 {
194        0
195    }
196
197    /// Gets the space usage of all filters in the tree.
198    ///
199    /// May not correspond to the actual memory size because filter blocks may be paged out.
200    fn filter_size(&self) -> usize;
201
202    /// Gets the memory usage of all pinned filters in the tree.
203    fn pinned_filter_size(&self) -> usize;
204
205    /// Gets the memory usage of all pinned index blocks in the tree.
206    fn pinned_block_index_size(&self) -> usize;
207
208    /// Gets the length of the version free list.
209    fn version_free_list_len(&self) -> usize;
210
211    /// Returns the metrics structure.
212    #[cfg(feature = "metrics")]
213    fn metrics(&self) -> &Arc<crate::Metrics>;
214
215    /// Acquires the flush lock which is required to call [`Tree::flush`].
216    fn get_flush_lock(&self) -> MutexGuard<'_, ()>;
217
218    /// Synchronously flushes a memtable to a table.
219    ///
220    /// This method will not make the table immediately available,
221    /// use [`AbstractTree::register_tables`] for that.
222    ///
223    /// # Errors
224    ///
225    /// Will return `Err` if an IO error occurs.
226    #[warn(clippy::type_complexity)]
227    fn flush_to_tables(
228        &self,
229        stream: impl Iterator<Item = crate::Result<InternalValue>>,
230    ) -> crate::Result<Option<(Vec<Table>, Option<Vec<BlobFile>>)>>;
231
232    /// Atomically registers flushed tables into the tree, removing their associated sealed memtables.
233    ///
234    /// # Errors
235    ///
236    /// Will return `Err` if an IO error occurs.
237    fn register_tables(
238        &self,
239        tables: &[Table],
240        blob_files: Option<&[BlobFile]>,
241        frag_map: Option<FragmentationMap>,
242        sealed_memtables_to_delete: &[MemtableId],
243        gc_watermark: SeqNo,
244    ) -> crate::Result<()>;
245
246    /// Clears the active memtable atomically.
247    fn clear_active_memtable(&self);
248
249    /// Sets the active memtable.
250    ///
251    /// May be used to restore the LSM-tree's in-memory state from a write-ahead log
252    /// after tree recovery.
253    fn set_active_memtable(&self, memtable: Memtable);
254
255    /// Returns the number of sealed memtables.
256    fn sealed_memtable_count(&self) -> usize;
257
258    /// Adds a sealed memtables.
259    ///
260    /// May be used to restore the LSM-tree's in-memory state from some journals.
261    fn add_sealed_memtable(&self, id: MemtableId, memtable: Arc<Memtable>);
262
263    /// Performs compaction on the tree's levels, blocking the caller until it's done.
264    ///
265    /// # Errors
266    ///
267    /// Will return `Err` if an IO error occurs.
268    fn compact(
269        &self,
270        strategy: Arc<dyn CompactionStrategy>,
271        seqno_threshold: SeqNo,
272    ) -> crate::Result<()>;
273
274    /// Returns the next table's ID.
275    fn get_next_table_id(&self) -> TableId;
276
277    /// Returns the tree config.
278    fn tree_config(&self) -> &Config;
279
280    /// Returns the highest sequence number.
281    fn get_highest_seqno(&self) -> Option<SeqNo> {
282        let memtable_seqno = self.get_highest_memtable_seqno();
283        let table_seqno = self.get_highest_persisted_seqno();
284        memtable_seqno.max(table_seqno)
285    }
286
287    /// Returns the approximate size of the active memtable in bytes.
288    ///
289    /// May be used to flush the memtable if it grows too large.
290    fn active_memtable_size(&self) -> u64;
291
292    /// Returns the tree type.
293    fn tree_type(&self) -> TreeType;
294
295    /// Seals the active memtable.
296    fn rotate_memtable(&self) -> Option<Arc<Memtable>>;
297
298    /// Returns the number of tables currently in the tree.
299    fn table_count(&self) -> usize;
300
301    /// Returns the number of tables in `levels[idx]`.
302    ///
303    /// Returns `None` if the level does not exist (if idx >= 7).
304    fn level_table_count(&self, idx: usize) -> Option<usize>;
305
306    /// Returns the number of disjoint runs in L0.
307    ///
308    /// Can be used to determine whether to write stall.
309    fn l0_run_count(&self) -> usize;
310
311    /// Returns the number of blob files currently in the tree.
312    fn blob_file_count(&self) -> usize;
313
314    /// Approximates the number of items in the tree.
315    fn approximate_len(&self) -> usize;
316
317    /// Returns the disk space usage.
318    fn disk_space(&self) -> u64;
319
320    /// Returns the highest sequence number of the active memtable.
321    fn get_highest_memtable_seqno(&self) -> Option<SeqNo>;
322
323    /// Returns the highest sequence number that is flushed to disk.
324    fn get_highest_persisted_seqno(&self) -> Option<SeqNo>;
325
326    /// Scans the entire tree, returning the number of items.
327    ///
328    /// ###### Caution
329    ///
330    /// This operation scans the entire tree: O(n) complexity!
331    ///
332    /// Never, under any circumstances, use .`len()` == 0 to check
333    /// if the tree is empty, use [`Tree::is_empty`] instead.
334    ///
335    /// # Examples
336    ///
337    /// ```
338    /// # use lsm_tree::Error as TreeError;
339    /// use lsm_tree::{AbstractTree, Config, Tree};
340    ///
341    /// let folder = tempfile::tempdir()?;
342    /// let tree = Config::new(folder, Default::default()).open()?;
343    ///
344    /// assert_eq!(tree.len(0, None)?, 0);
345    /// tree.insert("1", "abc", 0);
346    /// tree.insert("3", "abc", 1);
347    /// tree.insert("5", "abc", 2);
348    /// assert_eq!(tree.len(3, None)?, 3);
349    /// #
350    /// # Ok::<(), TreeError>(())
351    /// ```
352    ///
353    /// # Errors
354    ///
355    /// Will return `Err` if an IO error occurs.
356    fn len(&self, seqno: SeqNo, index: Option<Arc<Memtable>>) -> crate::Result<usize> {
357        let mut count = 0;
358
359        for item in self.iter(seqno, index) {
360            let _ = item.key()?;
361            count += 1;
362        }
363
364        Ok(count)
365    }
366
367    /// Returns `true` if the tree is empty.
368    ///
369    /// This operation has O(log N) complexity.
370    ///
371    /// # Examples
372    ///
373    /// ```
374    /// # let folder = tempfile::tempdir()?;
375    /// use lsm_tree::{AbstractTree, Config, Tree};
376    ///
377    /// let tree = Config::new(folder, Default::default()).open()?;
378    /// assert!(tree.is_empty(0, None)?);
379    ///
380    /// tree.insert("a", "abc", 0);
381    /// assert!(!tree.is_empty(1, None)?);
382    /// #
383    /// # Ok::<(), lsm_tree::Error>(())
384    /// ```
385    ///
386    /// # Errors
387    ///
388    /// Will return `Err` if an IO error occurs.
389    fn is_empty(&self, seqno: SeqNo, index: Option<Arc<Memtable>>) -> crate::Result<bool> {
390        self.first_key_value(seqno, index).map(|x| x.is_none())
391    }
392
393    /// Returns the first key-value pair in the tree.
394    /// The key in this pair is the minimum key in the tree.
395    ///
396    /// # Examples
397    ///
398    /// ```
399    /// # use lsm_tree::Error as TreeError;
400    /// # use lsm_tree::{AbstractTree, Config, Tree};
401    /// #
402    /// # let folder = tempfile::tempdir()?;
403    /// let tree = Config::new(folder, Default::default()).open()?;
404    ///
405    /// tree.insert("1", "abc", 0);
406    /// tree.insert("3", "abc", 1);
407    /// tree.insert("5", "abc", 2);
408    ///
409    /// let (key, _) = tree.first_key_value(3, None)?.expect("item should exist");
410    /// assert_eq!(&*key, "1".as_bytes());
411    /// #
412    /// # Ok::<(), TreeError>(())
413    /// ```
414    ///
415    /// # Errors
416    ///
417    /// Will return `Err` if an IO error occurs.
418    fn first_key_value(
419        &self,
420        seqno: SeqNo,
421        index: Option<Arc<Memtable>>,
422    ) -> crate::Result<Option<KvPair>> {
423        self.iter(seqno, index)
424            .next()
425            .map(Guard::into_inner)
426            .transpose()
427    }
428
429    /// Returns the last key-value pair in the tree.
430    /// The key in this pair is the maximum key in the tree.
431    ///
432    /// # Examples
433    ///
434    /// ```
435    /// # use lsm_tree::Error as TreeError;
436    /// # use lsm_tree::{AbstractTree, Config, Tree};
437    /// #
438    /// # let folder = tempfile::tempdir()?;
439    /// # let tree = Config::new(folder, Default::default()).open()?;
440    /// #
441    /// tree.insert("1", "abc", 0);
442    /// tree.insert("3", "abc", 1);
443    /// tree.insert("5", "abc", 2);
444    ///
445    /// let (key, _) = tree.last_key_value(3, None)?.expect("item should exist");
446    /// assert_eq!(&*key, "5".as_bytes());
447    /// #
448    /// # Ok::<(), TreeError>(())
449    /// ```
450    ///
451    /// # Errors
452    ///
453    /// Will return `Err` if an IO error occurs.
454    fn last_key_value(
455        &self,
456        seqno: SeqNo,
457        index: Option<Arc<Memtable>>,
458    ) -> crate::Result<Option<KvPair>> {
459        self.iter(seqno, index)
460            .next_back()
461            .map(Guard::into_inner)
462            .transpose()
463    }
464
465    /// Returns the size of a value if it exists.
466    ///
467    /// # Examples
468    ///
469    /// ```
470    /// # let folder = tempfile::tempdir()?;
471    /// use lsm_tree::{AbstractTree, Config, Tree};
472    ///
473    /// let tree = Config::new(folder, Default::default()).open()?;
474    /// tree.insert("a", "my_value", 0);
475    ///
476    /// let size = tree.size_of("a", 1)?.unwrap_or_default();
477    /// assert_eq!("my_value".len() as u32, size);
478    ///
479    /// let size = tree.size_of("b", 1)?.unwrap_or_default();
480    /// assert_eq!(0, size);
481    /// #
482    /// # Ok::<(), lsm_tree::Error>(())
483    /// ```
484    ///
485    /// # Errors
486    ///
487    /// Will return `Err` if an IO error occurs.
488    fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>>;
489
490    /// Retrieves an item from the tree.
491    ///
492    /// # Examples
493    ///
494    /// ```
495    /// # let folder = tempfile::tempdir()?;
496    /// use lsm_tree::{AbstractTree, Config, Tree};
497    ///
498    /// let tree = Config::new(folder, Default::default()).open()?;
499    /// tree.insert("a", "my_value", 0);
500    ///
501    /// let item = tree.get("a", 1)?;
502    /// assert_eq!(Some("my_value".as_bytes().into()), item);
503    /// #
504    /// # Ok::<(), lsm_tree::Error>(())
505    /// ```
506    ///
507    /// # Errors
508    ///
509    /// Will return `Err` if an IO error occurs.
510    fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<UserValue>>;
511
512    /// Returns `true` if the tree contains the specified key.
513    ///
514    /// # Examples
515    ///
516    /// ```
517    /// # let folder = tempfile::tempdir()?;
518    /// # use lsm_tree::{AbstractTree, Config, Tree};
519    /// #
520    /// let tree = Config::new(folder, Default::default()).open()?;
521    /// assert!(!tree.contains_key("a", 0)?);
522    ///
523    /// tree.insert("a", "abc", 0);
524    /// assert!(tree.contains_key("a", 1)?);
525    /// #
526    /// # Ok::<(), lsm_tree::Error>(())
527    /// ```
528    ///
529    /// # Errors
530    ///
531    /// Will return `Err` if an IO error occurs.
532    fn contains_key<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<bool> {
533        self.get(key, seqno).map(|x| x.is_some())
534    }
535
536    /// Inserts a key-value pair into the tree.
537    ///
538    /// If the key already exists, the item will be overwritten.
539    ///
540    /// Returns the added item's size and new size of the memtable.
541    ///
542    /// # Examples
543    ///
544    /// ```
545    /// # let folder = tempfile::tempdir()?;
546    /// use lsm_tree::{AbstractTree, Config, Tree};
547    ///
548    /// let tree = Config::new(folder, Default::default()).open()?;
549    /// tree.insert("a", "abc", 0);
550    /// #
551    /// # Ok::<(), lsm_tree::Error>(())
552    /// ```
553    ///
554    /// # Errors
555    ///
556    /// Will return `Err` if an IO error occurs.
557    fn insert<K: Into<UserKey>, V: Into<UserValue>>(
558        &self,
559        key: K,
560        value: V,
561        seqno: SeqNo,
562    ) -> (u64, u64);
563
564    /// Removes an item from the tree.
565    ///
566    /// Returns the added item's size and new size of the memtable.
567    ///
568    /// # Examples
569    ///
570    /// ```
571    /// # let folder = tempfile::tempdir()?;
572    /// # use lsm_tree::{AbstractTree, Config, Tree};
573    /// #
574    /// # let tree = Config::new(folder, Default::default()).open()?;
575    /// tree.insert("a", "abc", 0);
576    ///
577    /// let item = tree.get("a", 1)?.expect("should have item");
578    /// assert_eq!("abc".as_bytes(), &*item);
579    ///
580    /// tree.remove("a", 1);
581    ///
582    /// let item = tree.get("a", 2)?;
583    /// assert_eq!(None, item);
584    /// #
585    /// # Ok::<(), lsm_tree::Error>(())
586    /// ```
587    ///
588    /// # Errors
589    ///
590    /// Will return `Err` if an IO error occurs.
591    fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64);
592
593    /// Removes an item from the tree.
594    ///
595    /// The tombstone marker of this delete operation will vanish when it
596    /// collides with its corresponding insertion.
597    /// This may cause older versions of the value to be resurrected, so it should
598    /// only be used and preferred in scenarios where a key is only ever written once.
599    ///
600    /// Returns the added item's size and new size of the memtable.
601    ///
602    /// # Examples
603    ///
604    /// ```
605    /// # let folder = tempfile::tempdir()?;
606    /// # use lsm_tree::{AbstractTree, Config, Tree};
607    /// #
608    /// # let tree = Config::new(folder, Default::default()).open()?;
609    /// tree.insert("a", "abc", 0);
610    ///
611    /// let item = tree.get("a", 1)?.expect("should have item");
612    /// assert_eq!("abc".as_bytes(), &*item);
613    ///
614    /// tree.remove_weak("a", 1);
615    ///
616    /// let item = tree.get("a", 2)?;
617    /// assert_eq!(None, item);
618    /// #
619    /// # Ok::<(), lsm_tree::Error>(())
620    /// ```
621    ///
622    /// # Errors
623    ///
624    /// Will return `Err` if an IO error occurs.
625    #[doc(hidden)]
626    fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64);
627}