lsm_tree/
abstract.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use crate::{
6    iter_guard::IterGuardImpl, table::Table, version::Version, vlog::BlobFile, AnyTree, BlobTree,
7    Config, Guard, InternalValue, KvPair, Memtable, SeqNo, TableId, Tree, UserKey, UserValue,
8};
9use std::{
10    ops::RangeBounds,
11    sync::{Arc, MutexGuard, RwLockWriteGuard},
12};
13
14pub type RangeItem = crate::Result<KvPair>;
15
16/// Generic Tree API
17#[enum_dispatch::enum_dispatch]
18pub trait AbstractTree {
19    // TODO: fn() with Nuke compaction strategy (write lock) -> drop_range(..)
20
21    /// Returns the number of cached table file descriptors.
22    fn table_file_cache_size(&self) -> usize;
23
24    // TODO: remove
25    #[doc(hidden)]
26    fn version_memtable_size_sum(&self) -> u64 {
27        self.get_version_history_lock().memtable_size_sum()
28    }
29
30    #[doc(hidden)]
31    fn next_table_id(&self) -> TableId;
32
33    #[doc(hidden)]
34    fn id(&self) -> crate::TreeId;
35
36    /// Like [`AbstractTree::get`], but returns the actual internal entry, not just the user value.
37    ///
38    /// Used in tests.
39    #[doc(hidden)]
40    fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>>;
41
42    #[doc(hidden)]
43    fn current_version(&self) -> Version;
44
45    #[doc(hidden)]
46    fn get_version_history_lock(&self) -> RwLockWriteGuard<'_, crate::version::SuperVersions>;
47
48    /// Seals the active memtable and flushes to table(s).
49    ///
50    /// If there are already other sealed memtables lined up, those will be flushed as well.
51    ///
52    /// Only used in tests.
53    #[doc(hidden)]
54    fn flush_active_memtable(&self, eviction_seqno: SeqNo) -> crate::Result<()> {
55        let lock = self.get_flush_lock();
56        self.rotate_memtable();
57        self.flush(&lock, eviction_seqno)?;
58        Ok(())
59    }
60
61    /// Synchronously flushes pending sealed memtables to tables.
62    ///
63    /// Returns the sum of flushed memtable sizes that were flushed.
64    ///
65    /// The function may not return a result, if nothing was flushed.
66    ///
67    /// # Errors
68    ///
69    /// Will return `Err` if an IO error occurs.
70    fn flush(
71        &self,
72        _lock: &MutexGuard<'_, ()>,
73        seqno_threshold: SeqNo,
74    ) -> crate::Result<Option<u64>> {
75        use crate::{compaction::stream::CompactionStream, merge::Merger};
76
77        let version_history = self.get_version_history_lock();
78        let latest = version_history.latest_version();
79
80        if latest.sealed_memtables.len() == 0 {
81            return Ok(None);
82        }
83
84        let sealed_ids = latest
85            .sealed_memtables
86            .iter()
87            .map(|mt| mt.id)
88            .collect::<Vec<_>>();
89
90        let flushed_size = latest.sealed_memtables.iter().map(|mt| mt.size()).sum();
91
92        let merger = Merger::new(
93            latest
94                .sealed_memtables
95                .iter()
96                .map(|mt| mt.iter().map(Ok))
97                .collect::<Vec<_>>(),
98        );
99        let stream = CompactionStream::new(merger, seqno_threshold);
100
101        drop(version_history);
102
103        if let Some((tables, blob_files)) = self.flush_to_tables(stream)? {
104            self.register_tables(
105                &tables,
106                blob_files.as_deref(),
107                None,
108                &sealed_ids,
109                seqno_threshold,
110            )?;
111        }
112
113        Ok(Some(flushed_size))
114    }
115
116    /// Returns an iterator that scans through the entire tree.
117    ///
118    /// Avoid using this function, or limit it as otherwise it may scan a lot of items.
119    fn iter(
120        &self,
121        seqno: SeqNo,
122        index: Option<(Arc<Memtable>, SeqNo)>,
123    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
124        self.range::<&[u8], _>(.., seqno, index)
125    }
126
127    /// Returns an iterator over a prefixed set of items.
128    ///
129    /// Avoid using an empty prefix as it may scan a lot of items (unless limited).
130    fn prefix<K: AsRef<[u8]>>(
131        &self,
132        prefix: K,
133        seqno: SeqNo,
134        index: Option<(Arc<Memtable>, SeqNo)>,
135    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static>;
136
137    /// Returns an iterator over a range of items.
138    ///
139    /// Avoid using full or unbounded ranges as they may scan a lot of items (unless limited).
140    fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
141        &self,
142        range: R,
143        seqno: SeqNo,
144        index: Option<(Arc<Memtable>, SeqNo)>,
145    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static>;
146
147    /// Returns the approximate number of tombstones in the tree.
148    fn tombstone_count(&self) -> u64;
149
150    /// Returns the approximate number of weak tombstones (single deletes) in the tree.
151    fn weak_tombstone_count(&self) -> u64;
152
153    /// Returns the approximate number of values reclaimable once weak tombstones can be GC'd.
154    fn weak_tombstone_reclaimable_count(&self) -> u64;
155
156    /// Drops tables that are fully contained in a given range.
157    ///
158    /// Accepts any `RangeBounds`, including unbounded or exclusive endpoints.
159    /// If the normalized lower bound is greater than the upper bound, the
160    /// method returns without performing any work.
161    ///
162    /// # Errors
163    ///
164    /// Will return `Err` only if an IO error occurs.
165    fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()>;
166
167    /// Drops all tables and clears all memtables atomically.
168    ///
169    /// # Errors
170    ///
171    /// Will return `Err` only if an IO error occurs.
172    fn clear(&self) -> crate::Result<()>;
173
174    /// Performs major compaction, blocking the caller until it's done.
175    ///
176    /// # Errors
177    ///
178    /// Will return `Err` if an IO error occurs.
179    fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()>;
180
181    /// Returns the disk space used by stale blobs.
182    fn stale_blob_bytes(&self) -> u64 {
183        0
184    }
185
186    /// Gets the space usage of all filters in the tree.
187    ///
188    /// May not correspond to the actual memory size because filter blocks may be paged out.
189    fn filter_size(&self) -> u64;
190
191    /// Gets the memory usage of all pinned filters in the tree.
192    fn pinned_filter_size(&self) -> usize;
193
194    /// Gets the memory usage of all pinned index blocks in the tree.
195    fn pinned_block_index_size(&self) -> usize;
196
197    /// Gets the length of the version free list.
198    fn version_free_list_len(&self) -> usize;
199
200    /// Returns the metrics structure.
201    #[cfg(feature = "metrics")]
202    fn metrics(&self) -> &Arc<crate::Metrics>;
203
204    /// Acquires the flush lock which is required to call [`Tree::flush`].
205    fn get_flush_lock(&self) -> MutexGuard<'_, ()>;
206
207    /// Synchronously flushes a memtable to a table.
208    ///
209    /// This method will not make the table immediately available,
210    /// use [`AbstractTree::register_tables`] for that.
211    ///
212    /// # Errors
213    ///
214    /// Will return `Err` if an IO error occurs.
215    #[warn(clippy::type_complexity)]
216    fn flush_to_tables(
217        &self,
218        stream: impl Iterator<Item = crate::Result<InternalValue>>,
219    ) -> crate::Result<Option<(Vec<Table>, Option<Vec<BlobFile>>)>>;
220
221    /// Atomically registers flushed tables into the tree, removing their associated sealed memtables.
222    ///
223    /// # Errors
224    ///
225    /// Will return `Err` if an IO error occurs.
226    fn register_tables(
227        &self,
228        tables: &[Table],
229        blob_files: Option<&[BlobFile]>,
230        frag_map: Option<crate::blob_tree::FragmentationMap>,
231        sealed_memtables_to_delete: &[crate::tree::inner::MemtableId],
232        gc_watermark: SeqNo,
233    ) -> crate::Result<()>;
234
235    /// Clears the active memtable atomically.
236    fn clear_active_memtable(&self);
237
238    /// Returns the number of sealed memtables.
239    fn sealed_memtable_count(&self) -> usize;
240
241    /// Adds a sealed memtables.
242    ///
243    /// May be used to restore the LSM-tree's in-memory state from some journals.
244    fn add_sealed_memtable(&self, memtable: Arc<Memtable>);
245
246    /// Performs compaction on the tree's levels, blocking the caller until it's done.
247    ///
248    /// # Errors
249    ///
250    /// Will return `Err` if an IO error occurs.
251    fn compact(
252        &self,
253        strategy: Arc<dyn crate::compaction::CompactionStrategy>,
254        seqno_threshold: SeqNo,
255    ) -> crate::Result<()>;
256
257    /// Returns the next table's ID.
258    fn get_next_table_id(&self) -> TableId;
259
260    /// Returns the tree config.
261    fn tree_config(&self) -> &Config;
262
263    /// Returns the highest sequence number.
264    fn get_highest_seqno(&self) -> Option<SeqNo> {
265        let memtable_seqno = self.get_highest_memtable_seqno();
266        let table_seqno = self.get_highest_persisted_seqno();
267        memtable_seqno.max(table_seqno)
268    }
269
270    /// Returns the active memtable.
271    fn active_memtable(&self) -> Arc<Memtable>;
272
273    /// Returns the tree type.
274    fn tree_type(&self) -> crate::TreeType;
275
276    /// Seals the active memtable.
277    fn rotate_memtable(&self) -> Option<Arc<Memtable>>;
278
279    /// Returns the number of tables currently in the tree.
280    fn table_count(&self) -> usize;
281
282    /// Returns the number of tables in `levels[idx]`.
283    ///
284    /// Returns `None` if the level does not exist (if idx >= 7).
285    fn level_table_count(&self, idx: usize) -> Option<usize>;
286
287    /// Returns the number of disjoint runs in L0.
288    ///
289    /// Can be used to determine whether to write stall.
290    fn l0_run_count(&self) -> usize;
291
292    /// Returns the number of blob files currently in the tree.
293    fn blob_file_count(&self) -> usize;
294
295    /// Approximates the number of items in the tree.
296    fn approximate_len(&self) -> usize;
297
298    /// Returns the disk space usage.
299    fn disk_space(&self) -> u64;
300
301    /// Returns the highest sequence number of the active memtable.
302    fn get_highest_memtable_seqno(&self) -> Option<SeqNo>;
303
304    /// Returns the highest sequence number that is flushed to disk.
305    fn get_highest_persisted_seqno(&self) -> Option<SeqNo>;
306
307    /// Scans the entire tree, returning the number of items.
308    ///
309    /// ###### Caution
310    ///
311    /// This operation scans the entire tree: O(n) complexity!
312    ///
313    /// Never, under any circumstances, use .`len()` == 0 to check
314    /// if the tree is empty, use [`Tree::is_empty`] instead.
315    ///
316    /// # Examples
317    ///
318    /// ```
319    /// # use lsm_tree::Error as TreeError;
320    /// use lsm_tree::{AbstractTree, Config, Tree};
321    ///
322    /// let folder = tempfile::tempdir()?;
323    /// let tree = Config::new(folder, Default::default(), Default::default()).open()?;
324    ///
325    /// assert_eq!(tree.len(0, None)?, 0);
326    /// tree.insert("1", "abc", 0);
327    /// tree.insert("3", "abc", 1);
328    /// tree.insert("5", "abc", 2);
329    /// assert_eq!(tree.len(3, None)?, 3);
330    /// #
331    /// # Ok::<(), TreeError>(())
332    /// ```
333    ///
334    /// # Errors
335    ///
336    /// Will return `Err` if an IO error occurs.
337    fn len(&self, seqno: SeqNo, index: Option<(Arc<Memtable>, SeqNo)>) -> crate::Result<usize> {
338        let mut count = 0;
339
340        for item in self.iter(seqno, index) {
341            let _ = item.key()?;
342            count += 1;
343        }
344
345        Ok(count)
346    }
347
348    /// Returns `true` if the tree is empty.
349    ///
350    /// This operation has O(log N) complexity.
351    ///
352    /// # Examples
353    ///
354    /// ```
355    /// # let folder = tempfile::tempdir()?;
356    /// use lsm_tree::{AbstractTree, Config, Tree};
357    ///
358    /// let tree = Config::new(folder, Default::default(), Default::default()).open()?;
359    /// assert!(tree.is_empty(0, None)?);
360    ///
361    /// tree.insert("a", "abc", 0);
362    /// assert!(!tree.is_empty(1, None)?);
363    /// #
364    /// # Ok::<(), lsm_tree::Error>(())
365    /// ```
366    ///
367    /// # Errors
368    ///
369    /// Will return `Err` if an IO error occurs.
370    fn is_empty(&self, seqno: SeqNo, index: Option<(Arc<Memtable>, SeqNo)>) -> crate::Result<bool> {
371        self.first_key_value(seqno, index).map(|x| x.is_none())
372    }
373
374    /// Returns the first key-value pair in the tree.
375    /// The key in this pair is the minimum key in the tree.
376    ///
377    /// # Examples
378    ///
379    /// ```
380    /// # use lsm_tree::Error as TreeError;
381    /// # use lsm_tree::{AbstractTree, Config, Tree};
382    /// #
383    /// # let folder = tempfile::tempdir()?;
384    /// let tree = Config::new(folder, Default::default(), Default::default()).open()?;
385    ///
386    /// tree.insert("1", "abc", 0);
387    /// tree.insert("3", "abc", 1);
388    /// tree.insert("5", "abc", 2);
389    ///
390    /// let (key, _) = tree.first_key_value(3, None)?.expect("item should exist");
391    /// assert_eq!(&*key, "1".as_bytes());
392    /// #
393    /// # Ok::<(), TreeError>(())
394    /// ```
395    ///
396    /// # Errors
397    ///
398    /// Will return `Err` if an IO error occurs.
399    fn first_key_value(
400        &self,
401        seqno: SeqNo,
402        index: Option<(Arc<Memtable>, SeqNo)>,
403    ) -> crate::Result<Option<KvPair>> {
404        self.iter(seqno, index)
405            .next()
406            .map(Guard::into_inner)
407            .transpose()
408    }
409
410    /// Returns the last key-value pair in the tree.
411    /// The key in this pair is the maximum key in the tree.
412    ///
413    /// # Examples
414    ///
415    /// ```
416    /// # use lsm_tree::Error as TreeError;
417    /// # use lsm_tree::{AbstractTree, Config, Tree};
418    /// #
419    /// # let folder = tempfile::tempdir()?;
420    /// # let tree = Config::new(folder, Default::default(), Default::default()).open()?;
421    /// #
422    /// tree.insert("1", "abc", 0);
423    /// tree.insert("3", "abc", 1);
424    /// tree.insert("5", "abc", 2);
425    ///
426    /// let (key, _) = tree.last_key_value(3, None)?.expect("item should exist");
427    /// assert_eq!(&*key, "5".as_bytes());
428    /// #
429    /// # Ok::<(), TreeError>(())
430    /// ```
431    ///
432    /// # Errors
433    ///
434    /// Will return `Err` if an IO error occurs.
435    fn last_key_value(
436        &self,
437        seqno: SeqNo,
438        index: Option<(Arc<Memtable>, SeqNo)>,
439    ) -> crate::Result<Option<KvPair>> {
440        self.iter(seqno, index)
441            .next_back()
442            .map(Guard::into_inner)
443            .transpose()
444    }
445
446    /// Returns the size of a value if it exists.
447    ///
448    /// # Examples
449    ///
450    /// ```
451    /// # let folder = tempfile::tempdir()?;
452    /// use lsm_tree::{AbstractTree, Config, Tree};
453    ///
454    /// let tree = Config::new(folder, Default::default(), Default::default()).open()?;
455    /// tree.insert("a", "my_value", 0);
456    ///
457    /// let size = tree.size_of("a", 1)?.unwrap_or_default();
458    /// assert_eq!("my_value".len() as u32, size);
459    ///
460    /// let size = tree.size_of("b", 1)?.unwrap_or_default();
461    /// assert_eq!(0, size);
462    /// #
463    /// # Ok::<(), lsm_tree::Error>(())
464    /// ```
465    ///
466    /// # Errors
467    ///
468    /// Will return `Err` if an IO error occurs.
469    fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>>;
470
471    /// Retrieves an item from the tree.
472    ///
473    /// # Examples
474    ///
475    /// ```
476    /// # let folder = tempfile::tempdir()?;
477    /// use lsm_tree::{AbstractTree, Config, Tree};
478    ///
479    /// let tree = Config::new(folder, Default::default(), Default::default()).open()?;
480    /// tree.insert("a", "my_value", 0);
481    ///
482    /// let item = tree.get("a", 1)?;
483    /// assert_eq!(Some("my_value".as_bytes().into()), item);
484    /// #
485    /// # Ok::<(), lsm_tree::Error>(())
486    /// ```
487    ///
488    /// # Errors
489    ///
490    /// Will return `Err` if an IO error occurs.
491    fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<UserValue>>;
492
493    /// Returns `true` if the tree contains the specified key.
494    ///
495    /// # Examples
496    ///
497    /// ```
498    /// # let folder = tempfile::tempdir()?;
499    /// # use lsm_tree::{AbstractTree, Config, Tree};
500    /// #
501    /// let tree = Config::new(folder, Default::default(), Default::default()).open()?;
502    /// assert!(!tree.contains_key("a", 0)?);
503    ///
504    /// tree.insert("a", "abc", 0);
505    /// assert!(tree.contains_key("a", 1)?);
506    /// #
507    /// # Ok::<(), lsm_tree::Error>(())
508    /// ```
509    ///
510    /// # Errors
511    ///
512    /// Will return `Err` if an IO error occurs.
513    fn contains_key<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<bool> {
514        self.get(key, seqno).map(|x| x.is_some())
515    }
516
517    /// Inserts a key-value pair into the tree.
518    ///
519    /// If the key already exists, the item will be overwritten.
520    ///
521    /// Returns the added item's size and new size of the memtable.
522    ///
523    /// # Examples
524    ///
525    /// ```
526    /// # let folder = tempfile::tempdir()?;
527    /// use lsm_tree::{AbstractTree, Config, Tree};
528    ///
529    /// let tree = Config::new(folder, Default::default(), Default::default()).open()?;
530    /// tree.insert("a", "abc", 0);
531    /// #
532    /// # Ok::<(), lsm_tree::Error>(())
533    /// ```
534    ///
535    /// # Errors
536    ///
537    /// Will return `Err` if an IO error occurs.
538    fn insert<K: Into<UserKey>, V: Into<UserValue>>(
539        &self,
540        key: K,
541        value: V,
542        seqno: SeqNo,
543    ) -> (u64, u64);
544
545    /// Removes an item from the tree.
546    ///
547    /// Returns the added item's size and new size of the memtable.
548    ///
549    /// # Examples
550    ///
551    /// ```
552    /// # let folder = tempfile::tempdir()?;
553    /// # use lsm_tree::{AbstractTree, Config, Tree};
554    /// #
555    /// # let tree = Config::new(folder, Default::default(), Default::default()).open()?;
556    /// tree.insert("a", "abc", 0);
557    ///
558    /// let item = tree.get("a", 1)?.expect("should have item");
559    /// assert_eq!("abc".as_bytes(), &*item);
560    ///
561    /// tree.remove("a", 1);
562    ///
563    /// let item = tree.get("a", 2)?;
564    /// assert_eq!(None, item);
565    /// #
566    /// # Ok::<(), lsm_tree::Error>(())
567    /// ```
568    ///
569    /// # Errors
570    ///
571    /// Will return `Err` if an IO error occurs.
572    fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64);
573
574    /// Removes an item from the tree.
575    ///
576    /// The tombstone marker of this delete operation will vanish when it
577    /// collides with its corresponding insertion.
578    /// This may cause older versions of the value to be resurrected, so it should
579    /// only be used and preferred in scenarios where a key is only ever written once.
580    ///
581    /// Returns the added item's size and new size of the memtable.
582    ///
583    /// # Examples
584    ///
585    /// ```
586    /// # let folder = tempfile::tempdir()?;
587    /// # use lsm_tree::{AbstractTree, Config, Tree};
588    /// #
589    /// # let tree = Config::new(folder, Default::default(), Default::default()).open()?;
590    /// tree.insert("a", "abc", 0);
591    ///
592    /// let item = tree.get("a", 1)?.expect("should have item");
593    /// assert_eq!("abc".as_bytes(), &*item);
594    ///
595    /// tree.remove_weak("a", 1);
596    ///
597    /// let item = tree.get("a", 2)?;
598    /// assert_eq!(None, item);
599    /// #
600    /// # Ok::<(), lsm_tree::Error>(())
601    /// ```
602    ///
603    /// # Errors
604    ///
605    /// Will return `Err` if an IO error occurs.
606    #[doc(hidden)]
607    fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64);
608}