lsm_tree/
abstract.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use crate::{
6    iter_guard::IterGuardImpl, table::Table, version::Version, vlog::BlobFile, AnyTree, BlobTree,
7    Config, Guard, InternalValue, KvPair, Memtable, SeqNo, SequenceNumberCounter, TableId, Tree,
8    UserKey, UserValue,
9};
10use std::{
11    ops::RangeBounds,
12    sync::{Arc, MutexGuard, RwLockWriteGuard},
13};
14
15pub type RangeItem = crate::Result<KvPair>;
16
17/// Generic Tree API
18#[enum_dispatch::enum_dispatch]
19pub trait AbstractTree {
20    /// Returns the number of cached table file descriptors.
21    fn table_file_cache_size(&self) -> usize;
22
23    // TODO: remove
24    #[doc(hidden)]
25    fn version_memtable_size_sum(&self) -> u64 {
26        self.get_version_history_lock().memtable_size_sum()
27    }
28
29    #[doc(hidden)]
30    fn next_table_id(&self) -> TableId;
31
32    #[doc(hidden)]
33    fn id(&self) -> crate::TreeId;
34
35    /// Like [`AbstractTree::get`], but returns the actual internal entry, not just the user value.
36    ///
37    /// Used in tests.
38    #[doc(hidden)]
39    fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>>;
40
41    #[doc(hidden)]
42    fn current_version(&self) -> Version;
43
44    #[doc(hidden)]
45    fn get_version_history_lock(&self) -> RwLockWriteGuard<'_, crate::version::SuperVersions>;
46
47    /// Seals the active memtable and flushes to table(s).
48    ///
49    /// If there are already other sealed memtables lined up, those will be flushed as well.
50    ///
51    /// Only used in tests.
52    #[doc(hidden)]
53    fn flush_active_memtable(&self, eviction_seqno: SeqNo) -> crate::Result<()> {
54        let lock = self.get_flush_lock();
55        self.rotate_memtable();
56        self.flush(&lock, eviction_seqno)?;
57        Ok(())
58    }
59
60    /// Synchronously flushes pending sealed memtables to tables.
61    ///
62    /// Returns the sum of flushed memtable sizes that were flushed.
63    ///
64    /// The function may not return a result, if nothing was flushed.
65    ///
66    /// # Errors
67    ///
68    /// Will return `Err` if an IO error occurs.
69    fn flush(
70        &self,
71        _lock: &MutexGuard<'_, ()>,
72        seqno_threshold: SeqNo,
73    ) -> crate::Result<Option<u64>> {
74        use crate::{compaction::stream::CompactionStream, merge::Merger};
75
76        let version_history = self.get_version_history_lock();
77        let latest = version_history.latest_version();
78
79        if latest.sealed_memtables.len() == 0 {
80            return Ok(None);
81        }
82
83        let sealed_ids = latest
84            .sealed_memtables
85            .iter()
86            .map(|mt| mt.id)
87            .collect::<Vec<_>>();
88
89        let flushed_size = latest.sealed_memtables.iter().map(|mt| mt.size()).sum();
90
91        let merger = Merger::new(
92            latest
93                .sealed_memtables
94                .iter()
95                .map(|mt| mt.iter().map(Ok))
96                .collect::<Vec<_>>(),
97        );
98        let stream = CompactionStream::new(merger, seqno_threshold);
99
100        drop(version_history);
101
102        if let Some((tables, blob_files)) = self.flush_to_tables(stream)? {
103            self.register_tables(
104                &tables,
105                blob_files.as_deref(),
106                None,
107                &sealed_ids,
108                seqno_threshold,
109            )?;
110        }
111
112        Ok(Some(flushed_size))
113    }
114
115    /// Returns an iterator that scans through the entire tree.
116    ///
117    /// Avoid using this function, or limit it as otherwise it may scan a lot of items.
118    fn iter(
119        &self,
120        seqno: SeqNo,
121        index: Option<(Arc<Memtable>, SeqNo)>,
122    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
123        self.range::<&[u8], _>(.., seqno, index)
124    }
125
126    /// Returns an iterator over a prefixed set of items.
127    ///
128    /// Avoid using an empty prefix as it may scan a lot of items (unless limited).
129    fn prefix<K: AsRef<[u8]>>(
130        &self,
131        prefix: K,
132        seqno: SeqNo,
133        index: Option<(Arc<Memtable>, SeqNo)>,
134    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static>;
135
136    /// Returns an iterator over a range of items.
137    ///
138    /// Avoid using full or unbounded ranges as they may scan a lot of items (unless limited).
139    fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
140        &self,
141        range: R,
142        seqno: SeqNo,
143        index: Option<(Arc<Memtable>, SeqNo)>,
144    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static>;
145
146    /// Ingests a sorted stream of key-value pairs into the tree.
147    ///
148    /// Can only be called on a new fresh, empty tree.
149    ///
150    /// # Errors
151    ///
152    /// Will return `Err` if an IO error occurs.
153    ///
154    /// # Panics
155    ///
156    /// Panics if the tree is **not** initially empty.
157    ///
158    /// Will panic if the input iterator is not sorted in ascending order.
159    #[doc(hidden)]
160    fn ingest(
161        &self,
162        iter: impl Iterator<Item = (UserKey, UserValue)>,
163        seqno_generator: &SequenceNumberCounter,
164        visible_seqno: &SequenceNumberCounter,
165    ) -> crate::Result<()>;
166
167    /// Returns the approximate number of tombstones in the tree.
168    fn tombstone_count(&self) -> u64;
169
170    /// Returns the approximate number of weak tombstones (single deletes) in the tree.
171    fn weak_tombstone_count(&self) -> u64;
172
173    /// Returns the approximate number of values reclaimable once weak tombstones can be GC'd.
174    fn weak_tombstone_reclaimable_count(&self) -> u64;
175
176    // TODO: clear() with Nuke compaction strategy (write lock) -> drop_range(..)
177
178    /// Drops tables that are fully contained in a given range.
179    ///
180    /// Accepts any `RangeBounds`, including unbounded or exclusive endpoints.
181    /// If the normalized lower bound is greater than the upper bound, the
182    /// method returns without performing any work.
183    ///
184    /// # Errors
185    ///
186    /// Will return `Err` only if an IO error occurs during compaction.
187    fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()>;
188
189    /// Performs major compaction, blocking the caller until it's done.
190    ///
191    /// # Errors
192    ///
193    /// Will return `Err` if an IO error occurs.
194    fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()>;
195
196    /// Returns the disk space used by stale blobs.
197    fn stale_blob_bytes(&self) -> u64 {
198        0
199    }
200
201    /// Gets the space usage of all filters in the tree.
202    ///
203    /// May not correspond to the actual memory size because filter blocks may be paged out.
204    fn filter_size(&self) -> usize;
205
206    /// Gets the memory usage of all pinned filters in the tree.
207    fn pinned_filter_size(&self) -> usize;
208
209    /// Gets the memory usage of all pinned index blocks in the tree.
210    fn pinned_block_index_size(&self) -> usize;
211
212    /// Gets the length of the version free list.
213    fn version_free_list_len(&self) -> usize;
214
215    /// Returns the metrics structure.
216    #[cfg(feature = "metrics")]
217    fn metrics(&self) -> &Arc<crate::Metrics>;
218
219    /// Acquires the flush lock which is required to call [`Tree::flush`].
220    fn get_flush_lock(&self) -> MutexGuard<'_, ()>;
221
222    /// Synchronously flushes a memtable to a table.
223    ///
224    /// This method will not make the table immediately available,
225    /// use [`AbstractTree::register_tables`] for that.
226    ///
227    /// # Errors
228    ///
229    /// Will return `Err` if an IO error occurs.
230    #[warn(clippy::type_complexity)]
231    fn flush_to_tables(
232        &self,
233        stream: impl Iterator<Item = crate::Result<InternalValue>>,
234    ) -> crate::Result<Option<(Vec<Table>, Option<Vec<BlobFile>>)>>;
235
236    /// Atomically registers flushed tables into the tree, removing their associated sealed memtables.
237    ///
238    /// # Errors
239    ///
240    /// Will return `Err` if an IO error occurs.
241    fn register_tables(
242        &self,
243        tables: &[Table],
244        blob_files: Option<&[BlobFile]>,
245        frag_map: Option<crate::blob_tree::FragmentationMap>,
246        sealed_memtables_to_delete: &[crate::tree::inner::MemtableId],
247        gc_watermark: SeqNo,
248    ) -> crate::Result<()>;
249
250    /// Clears the active memtable atomically.
251    fn clear_active_memtable(&self);
252
253    /// Sets the active memtable.
254    ///
255    /// May be used to restore the LSM-tree's in-memory state from a write-ahead log
256    /// after tree recovery.
257    fn set_active_memtable(&self, memtable: Memtable);
258
259    /// Returns the number of sealed memtables.
260    fn sealed_memtable_count(&self) -> usize;
261
262    /// Adds a sealed memtables.
263    ///
264    /// May be used to restore the LSM-tree's in-memory state from some journals.
265    fn add_sealed_memtable(&self, memtable: Arc<Memtable>);
266
267    /// Performs compaction on the tree's levels, blocking the caller until it's done.
268    ///
269    /// # Errors
270    ///
271    /// Will return `Err` if an IO error occurs.
272    fn compact(
273        &self,
274        strategy: Arc<dyn crate::compaction::CompactionStrategy>,
275        seqno_threshold: SeqNo,
276    ) -> crate::Result<()>;
277
278    /// Returns the next table's ID.
279    fn get_next_table_id(&self) -> TableId;
280
281    /// Returns the tree config.
282    fn tree_config(&self) -> &Config;
283
284    /// Returns the highest sequence number.
285    fn get_highest_seqno(&self) -> Option<SeqNo> {
286        let memtable_seqno = self.get_highest_memtable_seqno();
287        let table_seqno = self.get_highest_persisted_seqno();
288        memtable_seqno.max(table_seqno)
289    }
290
291    /// Returns the approximate size of the active memtable in bytes.
292    ///
293    /// May be used to flush the memtable if it grows too large.
294    fn active_memtable_size(&self) -> u64;
295
296    /// Returns the tree type.
297    fn tree_type(&self) -> crate::TreeType;
298
299    /// Seals the active memtable.
300    fn rotate_memtable(&self) -> Option<Arc<Memtable>>;
301
302    /// Returns the number of tables currently in the tree.
303    fn table_count(&self) -> usize;
304
305    /// Returns the number of tables in `levels[idx]`.
306    ///
307    /// Returns `None` if the level does not exist (if idx >= 7).
308    fn level_table_count(&self, idx: usize) -> Option<usize>;
309
310    /// Returns the number of disjoint runs in L0.
311    ///
312    /// Can be used to determine whether to write stall.
313    fn l0_run_count(&self) -> usize;
314
315    /// Returns the number of blob files currently in the tree.
316    fn blob_file_count(&self) -> usize;
317
318    /// Approximates the number of items in the tree.
319    fn approximate_len(&self) -> usize;
320
321    /// Returns the disk space usage.
322    fn disk_space(&self) -> u64;
323
324    /// Returns the highest sequence number of the active memtable.
325    fn get_highest_memtable_seqno(&self) -> Option<SeqNo>;
326
327    /// Returns the highest sequence number that is flushed to disk.
328    fn get_highest_persisted_seqno(&self) -> Option<SeqNo>;
329
330    /// Scans the entire tree, returning the number of items.
331    ///
332    /// ###### Caution
333    ///
334    /// This operation scans the entire tree: O(n) complexity!
335    ///
336    /// Never, under any circumstances, use .`len()` == 0 to check
337    /// if the tree is empty, use [`Tree::is_empty`] instead.
338    ///
339    /// # Examples
340    ///
341    /// ```
342    /// # use lsm_tree::Error as TreeError;
343    /// use lsm_tree::{AbstractTree, Config, Tree};
344    ///
345    /// let folder = tempfile::tempdir()?;
346    /// let tree = Config::new(folder, Default::default()).open()?;
347    ///
348    /// assert_eq!(tree.len(0, None)?, 0);
349    /// tree.insert("1", "abc", 0);
350    /// tree.insert("3", "abc", 1);
351    /// tree.insert("5", "abc", 2);
352    /// assert_eq!(tree.len(3, None)?, 3);
353    /// #
354    /// # Ok::<(), TreeError>(())
355    /// ```
356    ///
357    /// # Errors
358    ///
359    /// Will return `Err` if an IO error occurs.
360    fn len(&self, seqno: SeqNo, index: Option<(Arc<Memtable>, SeqNo)>) -> crate::Result<usize> {
361        let mut count = 0;
362
363        for item in self.iter(seqno, index) {
364            let _ = item.key()?;
365            count += 1;
366        }
367
368        Ok(count)
369    }
370
371    /// Returns `true` if the tree is empty.
372    ///
373    /// This operation has O(log N) complexity.
374    ///
375    /// # Examples
376    ///
377    /// ```
378    /// # let folder = tempfile::tempdir()?;
379    /// use lsm_tree::{AbstractTree, Config, Tree};
380    ///
381    /// let tree = Config::new(folder, Default::default()).open()?;
382    /// assert!(tree.is_empty(0, None)?);
383    ///
384    /// tree.insert("a", "abc", 0);
385    /// assert!(!tree.is_empty(1, None)?);
386    /// #
387    /// # Ok::<(), lsm_tree::Error>(())
388    /// ```
389    ///
390    /// # Errors
391    ///
392    /// Will return `Err` if an IO error occurs.
393    fn is_empty(&self, seqno: SeqNo, index: Option<(Arc<Memtable>, SeqNo)>) -> crate::Result<bool> {
394        self.first_key_value(seqno, index).map(|x| x.is_none())
395    }
396
397    /// Returns the first key-value pair in the tree.
398    /// The key in this pair is the minimum key in the tree.
399    ///
400    /// # Examples
401    ///
402    /// ```
403    /// # use lsm_tree::Error as TreeError;
404    /// # use lsm_tree::{AbstractTree, Config, Tree};
405    /// #
406    /// # let folder = tempfile::tempdir()?;
407    /// let tree = Config::new(folder, Default::default()).open()?;
408    ///
409    /// tree.insert("1", "abc", 0);
410    /// tree.insert("3", "abc", 1);
411    /// tree.insert("5", "abc", 2);
412    ///
413    /// let (key, _) = tree.first_key_value(3, None)?.expect("item should exist");
414    /// assert_eq!(&*key, "1".as_bytes());
415    /// #
416    /// # Ok::<(), TreeError>(())
417    /// ```
418    ///
419    /// # Errors
420    ///
421    /// Will return `Err` if an IO error occurs.
422    fn first_key_value(
423        &self,
424        seqno: SeqNo,
425        index: Option<(Arc<Memtable>, SeqNo)>,
426    ) -> crate::Result<Option<KvPair>> {
427        self.iter(seqno, index)
428            .next()
429            .map(Guard::into_inner)
430            .transpose()
431    }
432
433    /// Returns the last key-value pair in the tree.
434    /// The key in this pair is the maximum key in the tree.
435    ///
436    /// # Examples
437    ///
438    /// ```
439    /// # use lsm_tree::Error as TreeError;
440    /// # use lsm_tree::{AbstractTree, Config, Tree};
441    /// #
442    /// # let folder = tempfile::tempdir()?;
443    /// # let tree = Config::new(folder, Default::default()).open()?;
444    /// #
445    /// tree.insert("1", "abc", 0);
446    /// tree.insert("3", "abc", 1);
447    /// tree.insert("5", "abc", 2);
448    ///
449    /// let (key, _) = tree.last_key_value(3, None)?.expect("item should exist");
450    /// assert_eq!(&*key, "5".as_bytes());
451    /// #
452    /// # Ok::<(), TreeError>(())
453    /// ```
454    ///
455    /// # Errors
456    ///
457    /// Will return `Err` if an IO error occurs.
458    fn last_key_value(
459        &self,
460        seqno: SeqNo,
461        index: Option<(Arc<Memtable>, SeqNo)>,
462    ) -> crate::Result<Option<KvPair>> {
463        self.iter(seqno, index)
464            .next_back()
465            .map(Guard::into_inner)
466            .transpose()
467    }
468
469    /// Returns the size of a value if it exists.
470    ///
471    /// # Examples
472    ///
473    /// ```
474    /// # let folder = tempfile::tempdir()?;
475    /// use lsm_tree::{AbstractTree, Config, Tree};
476    ///
477    /// let tree = Config::new(folder, Default::default()).open()?;
478    /// tree.insert("a", "my_value", 0);
479    ///
480    /// let size = tree.size_of("a", 1)?.unwrap_or_default();
481    /// assert_eq!("my_value".len() as u32, size);
482    ///
483    /// let size = tree.size_of("b", 1)?.unwrap_or_default();
484    /// assert_eq!(0, size);
485    /// #
486    /// # Ok::<(), lsm_tree::Error>(())
487    /// ```
488    ///
489    /// # Errors
490    ///
491    /// Will return `Err` if an IO error occurs.
492    fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>>;
493
494    /// Retrieves an item from the tree.
495    ///
496    /// # Examples
497    ///
498    /// ```
499    /// # let folder = tempfile::tempdir()?;
500    /// use lsm_tree::{AbstractTree, Config, Tree};
501    ///
502    /// let tree = Config::new(folder, Default::default()).open()?;
503    /// tree.insert("a", "my_value", 0);
504    ///
505    /// let item = tree.get("a", 1)?;
506    /// assert_eq!(Some("my_value".as_bytes().into()), item);
507    /// #
508    /// # Ok::<(), lsm_tree::Error>(())
509    /// ```
510    ///
511    /// # Errors
512    ///
513    /// Will return `Err` if an IO error occurs.
514    fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<UserValue>>;
515
516    /// Returns `true` if the tree contains the specified key.
517    ///
518    /// # Examples
519    ///
520    /// ```
521    /// # let folder = tempfile::tempdir()?;
522    /// # use lsm_tree::{AbstractTree, Config, Tree};
523    /// #
524    /// let tree = Config::new(folder, Default::default()).open()?;
525    /// assert!(!tree.contains_key("a", 0)?);
526    ///
527    /// tree.insert("a", "abc", 0);
528    /// assert!(tree.contains_key("a", 1)?);
529    /// #
530    /// # Ok::<(), lsm_tree::Error>(())
531    /// ```
532    ///
533    /// # Errors
534    ///
535    /// Will return `Err` if an IO error occurs.
536    fn contains_key<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<bool> {
537        self.get(key, seqno).map(|x| x.is_some())
538    }
539
540    /// Inserts a key-value pair into the tree.
541    ///
542    /// If the key already exists, the item will be overwritten.
543    ///
544    /// Returns the added item's size and new size of the memtable.
545    ///
546    /// # Examples
547    ///
548    /// ```
549    /// # let folder = tempfile::tempdir()?;
550    /// use lsm_tree::{AbstractTree, Config, Tree};
551    ///
552    /// let tree = Config::new(folder, Default::default()).open()?;
553    /// tree.insert("a", "abc", 0);
554    /// #
555    /// # Ok::<(), lsm_tree::Error>(())
556    /// ```
557    ///
558    /// # Errors
559    ///
560    /// Will return `Err` if an IO error occurs.
561    fn insert<K: Into<UserKey>, V: Into<UserValue>>(
562        &self,
563        key: K,
564        value: V,
565        seqno: SeqNo,
566    ) -> (u64, u64);
567
568    /// Removes an item from the tree.
569    ///
570    /// Returns the added item's size and new size of the memtable.
571    ///
572    /// # Examples
573    ///
574    /// ```
575    /// # let folder = tempfile::tempdir()?;
576    /// # use lsm_tree::{AbstractTree, Config, Tree};
577    /// #
578    /// # let tree = Config::new(folder, Default::default()).open()?;
579    /// tree.insert("a", "abc", 0);
580    ///
581    /// let item = tree.get("a", 1)?.expect("should have item");
582    /// assert_eq!("abc".as_bytes(), &*item);
583    ///
584    /// tree.remove("a", 1);
585    ///
586    /// let item = tree.get("a", 2)?;
587    /// assert_eq!(None, item);
588    /// #
589    /// # Ok::<(), lsm_tree::Error>(())
590    /// ```
591    ///
592    /// # Errors
593    ///
594    /// Will return `Err` if an IO error occurs.
595    fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64);
596
597    /// Removes an item from the tree.
598    ///
599    /// The tombstone marker of this delete operation will vanish when it
600    /// collides with its corresponding insertion.
601    /// This may cause older versions of the value to be resurrected, so it should
602    /// only be used and preferred in scenarios where a key is only ever written once.
603    ///
604    /// Returns the added item's size and new size of the memtable.
605    ///
606    /// # Examples
607    ///
608    /// ```
609    /// # let folder = tempfile::tempdir()?;
610    /// # use lsm_tree::{AbstractTree, Config, Tree};
611    /// #
612    /// # let tree = Config::new(folder, Default::default()).open()?;
613    /// tree.insert("a", "abc", 0);
614    ///
615    /// let item = tree.get("a", 1)?.expect("should have item");
616    /// assert_eq!("abc".as_bytes(), &*item);
617    ///
618    /// tree.remove_weak("a", 1);
619    ///
620    /// let item = tree.get("a", 2)?;
621    /// assert_eq!(None, item);
622    /// #
623    /// # Ok::<(), lsm_tree::Error>(())
624    /// ```
625    ///
626    /// # Errors
627    ///
628    /// Will return `Err` if an IO error occurs.
629    #[doc(hidden)]
630    fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64);
631}