Skip to main content

lsm_tree/
abstract_tree.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use crate::{
6    iter_guard::IterGuardImpl, table::Table, version::Version, vlog::BlobFile, AnyTree, BlobTree,
7    Config, Guard, InternalValue, KvPair, Memtable, SeqNo, TableId, Tree, UserKey, UserValue,
8};
9use std::{
10    ops::RangeBounds,
11    sync::{Arc, MutexGuard, RwLockWriteGuard},
12};
13
14pub type RangeItem = crate::Result<KvPair>;
15
16type FlushToTablesResult = (Vec<Table>, Option<Vec<BlobFile>>);
17
18/// Generic Tree API
19#[enum_dispatch::enum_dispatch]
20pub trait AbstractTree {
21    /// Debug method for tracing the MVCC history of a key.
22    #[doc(hidden)]
23    fn print_trace(&self, key: &[u8]) -> crate::Result<()>;
24
25    /// Returns the number of cached table file descriptors.
26    fn table_file_cache_size(&self) -> usize;
27
28    // TODO: remove
29    #[doc(hidden)]
30    fn version_memtable_size_sum(&self) -> u64 {
31        self.get_version_history_lock().memtable_size_sum()
32    }
33
34    #[doc(hidden)]
35    fn next_table_id(&self) -> TableId;
36
37    #[doc(hidden)]
38    fn id(&self) -> crate::TreeId;
39
40    /// Like [`AbstractTree::get`], but returns the actual internal entry, not just the user value.
41    ///
42    /// Used in tests.
43    #[doc(hidden)]
44    fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>>;
45
46    #[doc(hidden)]
47    fn current_version(&self) -> Version;
48
49    #[doc(hidden)]
50    fn get_version_history_lock(&self) -> RwLockWriteGuard<'_, crate::version::SuperVersions>;
51
52    /// Seals the active memtable and flushes to table(s).
53    ///
54    /// If there are already other sealed memtables lined up, those will be flushed as well.
55    ///
56    /// Only used in tests.
57    #[doc(hidden)]
58    fn flush_active_memtable(&self, eviction_seqno: SeqNo) -> crate::Result<()> {
59        let lock = self.get_flush_lock();
60        self.rotate_memtable();
61        self.flush(&lock, eviction_seqno)?;
62        Ok(())
63    }
64
65    /// Synchronously flushes pending sealed memtables to tables.
66    ///
67    /// Returns the sum of flushed memtable sizes that were flushed.
68    ///
69    /// The function may not return a result, if nothing was flushed.
70    ///
71    /// # Errors
72    ///
73    /// Will return `Err` if an IO error occurs.
74    fn flush(
75        &self,
76        _lock: &MutexGuard<'_, ()>,
77        seqno_threshold: SeqNo,
78    ) -> crate::Result<Option<u64>> {
79        use crate::{compaction::stream::CompactionStream, merge::Merger};
80
81        let version_history = self.get_version_history_lock();
82        let latest = version_history.latest_version();
83
84        if latest.sealed_memtables.len() == 0 {
85            return Ok(None);
86        }
87
88        let sealed_ids = latest
89            .sealed_memtables
90            .iter()
91            .map(|mt| mt.id)
92            .collect::<Vec<_>>();
93
94        let flushed_size = latest.sealed_memtables.iter().map(|mt| mt.size()).sum();
95
96        let merger = Merger::new(
97            latest
98                .sealed_memtables
99                .iter()
100                .map(|mt| mt.iter().map(Ok))
101                .collect::<Vec<_>>(),
102        );
103        let stream = CompactionStream::new(merger, seqno_threshold);
104
105        drop(version_history);
106
107        if let Some((tables, blob_files)) = self.flush_to_tables(stream)? {
108            self.register_tables(
109                &tables,
110                blob_files.as_deref(),
111                None,
112                &sealed_ids,
113                seqno_threshold,
114            )?;
115        }
116
117        Ok(Some(flushed_size))
118    }
119
120    /// Returns an iterator that scans through the entire tree.
121    ///
122    /// Avoid using this function, or limit it as otherwise it may scan a lot of items.
123    fn iter(
124        &self,
125        seqno: SeqNo,
126        index: Option<(Arc<Memtable>, SeqNo)>,
127    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
128        self.range::<&[u8], _>(.., seqno, index)
129    }
130
131    /// Returns an iterator over a prefixed set of items.
132    ///
133    /// Avoid using an empty prefix as it may scan a lot of items (unless limited).
134    fn prefix<K: AsRef<[u8]>>(
135        &self,
136        prefix: K,
137        seqno: SeqNo,
138        index: Option<(Arc<Memtable>, SeqNo)>,
139    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static>;
140
141    /// Returns an iterator over a range of items.
142    ///
143    /// Avoid using full or unbounded ranges as they may scan a lot of items (unless limited).
144    fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
145        &self,
146        range: R,
147        seqno: SeqNo,
148        index: Option<(Arc<Memtable>, SeqNo)>,
149    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static>;
150
151    /// Returns the approximate number of tombstones in the tree.
152    fn tombstone_count(&self) -> u64;
153
154    /// Returns the approximate number of weak tombstones (single deletes) in the tree.
155    fn weak_tombstone_count(&self) -> u64;
156
157    /// Returns the approximate number of values reclaimable once weak tombstones can be GC'd.
158    fn weak_tombstone_reclaimable_count(&self) -> u64;
159
160    /// Drops tables that are fully contained in a given range.
161    ///
162    /// Accepts any `RangeBounds`, including unbounded or exclusive endpoints.
163    /// If the normalized lower bound is greater than the upper bound, the
164    /// method returns without performing any work.
165    ///
166    /// # Errors
167    ///
168    /// Will return `Err` only if an IO error occurs.
169    fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()>;
170
171    /// Drops all tables and clears all memtables atomically.
172    ///
173    /// # Errors
174    ///
175    /// Will return `Err` only if an IO error occurs.
176    fn clear(&self) -> crate::Result<()>;
177
178    /// Performs major compaction, blocking the caller until it's done.
179    ///
180    /// # Errors
181    ///
182    /// Will return `Err` if an IO error occurs.
183    fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()>;
184
185    /// Returns the disk space used by stale blobs.
186    fn stale_blob_bytes(&self) -> u64 {
187        0
188    }
189
190    /// Gets the disk space usage of all filters in the tree.
191    ///
192    /// May not correspond to the actual memory size because filter blocks may be paged out.
193    fn filter_size(&self) -> u64;
194
195    /// Gets the memory usage of all pinned filters in the tree.
196    fn pinned_filter_size(&self) -> usize;
197
198    /// Gets the memory usage of all pinned index blocks in the tree.
199    fn pinned_block_index_size(&self) -> usize;
200
201    /// Gets the length of the version free list.
202    fn version_free_list_len(&self) -> usize;
203
204    /// Returns the metrics structure.
205    #[cfg(feature = "metrics")]
206    fn metrics(&self) -> &Arc<crate::Metrics>;
207
208    /// Acquires the flush lock which is required to call [`Tree::flush`].
209    fn get_flush_lock(&self) -> MutexGuard<'_, ()>;
210
211    /// Synchronously flushes a memtable to a table.
212    ///
213    /// This method will not make the table immediately available,
214    /// use [`AbstractTree::register_tables`] for that.
215    ///
216    /// # Errors
217    ///
218    /// Will return `Err` if an IO error occurs.
219    #[warn(clippy::type_complexity)]
220    fn flush_to_tables(
221        &self,
222        stream: impl Iterator<Item = crate::Result<InternalValue>>,
223    ) -> crate::Result<Option<FlushToTablesResult>>;
224
225    /// Atomically registers flushed tables into the tree, removing their associated sealed memtables.
226    ///
227    /// # Errors
228    ///
229    /// Will return `Err` if an IO error occurs.
230    fn register_tables(
231        &self,
232        tables: &[Table],
233        blob_files: Option<&[BlobFile]>,
234        frag_map: Option<crate::blob_tree::FragmentationMap>,
235        sealed_memtables_to_delete: &[crate::tree::inner::MemtableId],
236        gc_watermark: SeqNo,
237    ) -> crate::Result<()>;
238
239    /// Clears the active memtable atomically.
240    fn clear_active_memtable(&self);
241
242    /// Returns the number of sealed memtables.
243    fn sealed_memtable_count(&self) -> usize;
244
245    /// Performs compaction on the tree's levels, blocking the caller until it's done.
246    ///
247    /// # Errors
248    ///
249    /// Will return `Err` if an IO error occurs.
250    fn compact(
251        &self,
252        strategy: Arc<dyn crate::compaction::CompactionStrategy>,
253        seqno_threshold: SeqNo,
254    ) -> crate::Result<()>;
255
256    /// Returns the next table's ID.
257    fn get_next_table_id(&self) -> TableId;
258
259    /// Returns the tree config.
260    fn tree_config(&self) -> &Config;
261
262    /// Returns the highest sequence number.
263    fn get_highest_seqno(&self) -> Option<SeqNo> {
264        let memtable_seqno = self.get_highest_memtable_seqno();
265        let table_seqno = self.get_highest_persisted_seqno();
266        memtable_seqno.max(table_seqno)
267    }
268
269    /// Returns the active memtable.
270    fn active_memtable(&self) -> Arc<Memtable>;
271
272    /// Returns the tree type.
273    fn tree_type(&self) -> crate::TreeType {
274        // NOTE: This is only really safe to do, because we validate
275        // that the config's kv_separation_opts is consistent with the tree type during recovery.
276        if self.tree_config().kv_separation_opts.is_some() {
277            crate::TreeType::Blob
278        } else {
279            crate::TreeType::Standard
280        }
281    }
282
283    /// Seals the active memtable.
284    fn rotate_memtable(&self) -> Option<Arc<Memtable>>;
285
286    /// Returns the number of tables currently in the tree.
287    fn table_count(&self) -> usize;
288
289    /// Returns the number of tables in `levels[idx]`.
290    ///
291    /// Returns `None` if the level does not exist (if idx >= 7).
292    fn level_table_count(&self, idx: usize) -> Option<usize>;
293
294    /// Returns the number of disjoint runs in L0.
295    ///
296    /// Can be used to determine whether to write stall.
297    fn l0_run_count(&self) -> usize;
298
299    /// Returns the number of blob files currently in the tree.
300    fn blob_file_count(&self) -> usize;
301
302    /// Approximates the number of items in the tree.
303    fn approximate_len(&self) -> usize;
304
305    /// Returns the disk space usage.
306    fn disk_space(&self) -> u64;
307
308    /// Returns the highest sequence number of the active memtable.
309    fn get_highest_memtable_seqno(&self) -> Option<SeqNo>;
310
311    /// Returns the highest sequence number that is flushed to disk.
312    fn get_highest_persisted_seqno(&self) -> Option<SeqNo>;
313
314    /// Scans the entire tree, returning the number of items.
315    ///
316    /// ###### Caution
317    ///
318    /// This operation scans the entire tree: O(n) complexity!
319    ///
320    /// Never, under any circumstances, use .`len()` == 0 to check
321    /// if the tree is empty, use [`Tree::is_empty`] instead.
322    ///
323    /// # Examples
324    ///
325    /// ```
326    /// # use lsm_tree::Error as TreeError;
327    /// use lsm_tree::{AbstractTree, Config, Tree};
328    ///
329    /// let folder = tempfile::tempdir()?;
330    /// let tree = Config::new(folder, Default::default(), Default::default()).open()?;
331    ///
332    /// assert_eq!(tree.len(0, None)?, 0);
333    /// tree.insert("1", "abc", 0);
334    /// tree.insert("3", "abc", 1);
335    /// tree.insert("5", "abc", 2);
336    /// assert_eq!(tree.len(3, None)?, 3);
337    /// #
338    /// # Ok::<(), TreeError>(())
339    /// ```
340    ///
341    /// # Errors
342    ///
343    /// Will return `Err` if an IO error occurs.
344    fn len(&self, seqno: SeqNo, index: Option<(Arc<Memtable>, SeqNo)>) -> crate::Result<usize> {
345        let mut count = 0;
346
347        for item in self.iter(seqno, index) {
348            let _ = item.key()?;
349            count += 1;
350        }
351
352        Ok(count)
353    }
354
355    /// Returns `true` if the tree is empty.
356    ///
357    /// This operation has O(log N) complexity.
358    ///
359    /// # Examples
360    ///
361    /// ```
362    /// # let folder = tempfile::tempdir()?;
363    /// use lsm_tree::{AbstractTree, Config, Tree};
364    ///
365    /// let tree = Config::new(folder, Default::default(), Default::default()).open()?;
366    /// assert!(tree.is_empty(0, None)?);
367    ///
368    /// tree.insert("a", "abc", 0);
369    /// assert!(!tree.is_empty(1, None)?);
370    /// #
371    /// # Ok::<(), lsm_tree::Error>(())
372    /// ```
373    ///
374    /// # Errors
375    ///
376    /// Will return `Err` if an IO error occurs.
377    fn is_empty(&self, seqno: SeqNo, index: Option<(Arc<Memtable>, SeqNo)>) -> crate::Result<bool> {
378        Ok(self
379            .first_key_value(seqno, index)
380            .map(crate::Guard::key)
381            .transpose()?
382            .is_none())
383    }
384
385    /// Returns the first key-value pair in the tree.
386    /// The key in this pair is the minimum key in the tree.
387    ///
388    /// # Examples
389    ///
390    /// ```
391    /// # use lsm_tree::Error as TreeError;
392    /// # use lsm_tree::{AbstractTree, Config, Tree, Guard};
393    /// #
394    /// # let folder = tempfile::tempdir()?;
395    /// let tree = Config::new(folder, Default::default(), Default::default()).open()?;
396    ///
397    /// tree.insert("1", "abc", 0);
398    /// tree.insert("3", "abc", 1);
399    /// tree.insert("5", "abc", 2);
400    ///
401    /// let key = tree.first_key_value(3, None).expect("item should exist").key()?;
402    /// assert_eq!(&*key, "1".as_bytes());
403    /// #
404    /// # Ok::<(), TreeError>(())
405    /// ```
406    ///
407    /// # Errors
408    ///
409    /// Will return `Err` if an IO error occurs.
410    fn first_key_value(
411        &self,
412        seqno: SeqNo,
413        index: Option<(Arc<Memtable>, SeqNo)>,
414    ) -> Option<IterGuardImpl> {
415        self.iter(seqno, index).next()
416    }
417
418    /// Returns the last key-value pair in the tree.
419    /// The key in this pair is the maximum key in the tree.
420    ///
421    /// # Examples
422    ///
423    /// ```
424    /// # use lsm_tree::Error as TreeError;
425    /// # use lsm_tree::{AbstractTree, Config, Tree, Guard};
426    /// #
427    /// # let folder = tempfile::tempdir()?;
428    /// # let tree = Config::new(folder, Default::default(), Default::default()).open()?;
429    /// #
430    /// tree.insert("1", "abc", 0);
431    /// tree.insert("3", "abc", 1);
432    /// tree.insert("5", "abc", 2);
433    ///
434    /// let key = tree.last_key_value(3, None).expect("item should exist").key()?;
435    /// assert_eq!(&*key, "5".as_bytes());
436    /// #
437    /// # Ok::<(), TreeError>(())
438    /// ```
439    ///
440    /// # Errors
441    ///
442    /// Will return `Err` if an IO error occurs.
443    fn last_key_value(
444        &self,
445        seqno: SeqNo,
446        index: Option<(Arc<Memtable>, SeqNo)>,
447    ) -> Option<IterGuardImpl> {
448        self.iter(seqno, index).next_back()
449    }
450
451    /// Returns the size of a value if it exists.
452    ///
453    /// # Examples
454    ///
455    /// ```
456    /// # let folder = tempfile::tempdir()?;
457    /// use lsm_tree::{AbstractTree, Config, Tree};
458    ///
459    /// let tree = Config::new(folder, Default::default(), Default::default()).open()?;
460    /// tree.insert("a", "my_value", 0);
461    ///
462    /// let size = tree.size_of("a", 1)?.unwrap_or_default();
463    /// assert_eq!("my_value".len() as u32, size);
464    ///
465    /// let size = tree.size_of("b", 1)?.unwrap_or_default();
466    /// assert_eq!(0, size);
467    /// #
468    /// # Ok::<(), lsm_tree::Error>(())
469    /// ```
470    ///
471    /// # Errors
472    ///
473    /// Will return `Err` if an IO error occurs.
474    fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>>;
475
476    /// Retrieves an item from the tree.
477    ///
478    /// # Examples
479    ///
480    /// ```
481    /// # let folder = tempfile::tempdir()?;
482    /// use lsm_tree::{AbstractTree, Config, Tree};
483    ///
484    /// let tree = Config::new(folder, Default::default(), Default::default()).open()?;
485    /// tree.insert("a", "my_value", 0);
486    ///
487    /// let item = tree.get("a", 1)?;
488    /// assert_eq!(Some("my_value".as_bytes().into()), item);
489    /// #
490    /// # Ok::<(), lsm_tree::Error>(())
491    /// ```
492    ///
493    /// # Errors
494    ///
495    /// Will return `Err` if an IO error occurs.
496    fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<UserValue>>;
497
498    /// Returns `true` if the tree contains the specified key.
499    ///
500    /// # Examples
501    ///
502    /// ```
503    /// # let folder = tempfile::tempdir()?;
504    /// # use lsm_tree::{AbstractTree, Config, Tree};
505    /// #
506    /// let tree = Config::new(folder, Default::default(), Default::default()).open()?;
507    /// assert!(!tree.contains_key("a", 0)?);
508    ///
509    /// tree.insert("a", "abc", 0);
510    /// assert!(tree.contains_key("a", 1)?);
511    /// #
512    /// # Ok::<(), lsm_tree::Error>(())
513    /// ```
514    ///
515    /// # Errors
516    ///
517    /// Will return `Err` if an IO error occurs.
518    fn contains_key<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<bool> {
519        self.get(key, seqno).map(|x| x.is_some())
520    }
521
522    /// Inserts a key-value pair into the tree.
523    ///
524    /// If the key already exists, the item will be overwritten.
525    ///
526    /// Returns the added item's size and new size of the memtable.
527    ///
528    /// # Examples
529    ///
530    /// ```
531    /// # let folder = tempfile::tempdir()?;
532    /// use lsm_tree::{AbstractTree, Config, Tree};
533    ///
534    /// let tree = Config::new(folder, Default::default(), Default::default()).open()?;
535    /// tree.insert("a", "abc", 0);
536    /// #
537    /// # Ok::<(), lsm_tree::Error>(())
538    /// ```
539    ///
540    /// # Errors
541    ///
542    /// Will return `Err` if an IO error occurs.
543    fn insert<K: Into<UserKey>, V: Into<UserValue>>(
544        &self,
545        key: K,
546        value: V,
547        seqno: SeqNo,
548    ) -> (u64, u64);
549
550    /// Removes an item from the tree.
551    ///
552    /// Returns the added item's size and new size of the memtable.
553    ///
554    /// # Examples
555    ///
556    /// ```
557    /// # let folder = tempfile::tempdir()?;
558    /// # use lsm_tree::{AbstractTree, Config, Tree};
559    /// #
560    /// # let tree = Config::new(folder, Default::default(), Default::default()).open()?;
561    /// tree.insert("a", "abc", 0);
562    ///
563    /// let item = tree.get("a", 1)?.expect("should have item");
564    /// assert_eq!("abc".as_bytes(), &*item);
565    ///
566    /// tree.remove("a", 1);
567    ///
568    /// let item = tree.get("a", 2)?;
569    /// assert_eq!(None, item);
570    /// #
571    /// # Ok::<(), lsm_tree::Error>(())
572    /// ```
573    ///
574    /// # Errors
575    ///
576    /// Will return `Err` if an IO error occurs.
577    fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64);
578
579    /// Removes an item from the tree.
580    ///
581    /// The tombstone marker of this delete operation will vanish when it
582    /// collides with its corresponding insertion.
583    /// This may cause older versions of the value to be resurrected, so it should
584    /// only be used and preferred in scenarios where a key is only ever written once.
585    ///
586    /// Returns the added item's size and new size of the memtable.
587    ///
588    /// # Examples
589    ///
590    /// ```
591    /// # let folder = tempfile::tempdir()?;
592    /// # use lsm_tree::{AbstractTree, Config, Tree};
593    /// #
594    /// # let tree = Config::new(folder, Default::default(), Default::default()).open()?;
595    /// tree.insert("a", "abc", 0);
596    ///
597    /// let item = tree.get("a", 1)?.expect("should have item");
598    /// assert_eq!("abc".as_bytes(), &*item);
599    ///
600    /// tree.remove_weak("a", 1);
601    ///
602    /// let item = tree.get("a", 2)?;
603    /// assert_eq!(None, item);
604    /// #
605    /// # Ok::<(), lsm_tree::Error>(())
606    /// ```
607    ///
608    /// # Errors
609    ///
610    /// Will return `Err` if an IO error occurs.
611    #[doc(hidden)]
612    fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64);
613}