lsm_tree/abstract.rs
1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use crate::{
6 iter_guard::IterGuardImpl, table::Table, version::Version, vlog::BlobFile, AnyTree, BlobTree,
7 Config, Guard, InternalValue, KvPair, Memtable, SeqNo, SequenceNumberCounter, TableId, Tree,
8 UserKey, UserValue,
9};
10use std::{
11 ops::RangeBounds,
12 sync::{Arc, MutexGuard, RwLockWriteGuard},
13};
14
15pub type RangeItem = crate::Result<KvPair>;
16
17/// Generic Tree API
18#[enum_dispatch::enum_dispatch]
19pub trait AbstractTree {
20 /// Returns the number of cached table file descriptors.
21 fn table_file_cache_size(&self) -> usize;
22
23 // TODO: remove
24 #[doc(hidden)]
25 fn version_memtable_size_sum(&self) -> u64 {
26 self.get_version_history_lock().memtable_size_sum()
27 }
28
29 #[doc(hidden)]
30 fn next_table_id(&self) -> TableId;
31
32 #[doc(hidden)]
33 fn id(&self) -> crate::TreeId;
34
35 /// Like [`AbstractTree::get`], but returns the actual internal entry, not just the user value.
36 ///
37 /// Used in tests.
38 #[doc(hidden)]
39 fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>>;
40
41 #[doc(hidden)]
42 fn current_version(&self) -> Version;
43
44 #[doc(hidden)]
45 fn get_version_history_lock(&self) -> RwLockWriteGuard<'_, crate::version::SuperVersions>;
46
47 /// Seals the active memtable and flushes to table(s).
48 ///
49 /// If there are already other sealed memtables lined up, those will be flushed as well.
50 ///
51 /// Only used in tests.
52 #[doc(hidden)]
53 fn flush_active_memtable(&self, eviction_seqno: SeqNo) -> crate::Result<()> {
54 let lock = self.get_flush_lock();
55 self.rotate_memtable();
56 self.flush(&lock, eviction_seqno)?;
57 Ok(())
58 }
59
60 /// Synchronously flushes pending sealed memtables to tables.
61 ///
62 /// Returns the sum of flushed memtable sizes that were flushed.
63 ///
64 /// The function may not return a result, if nothing was flushed.
65 ///
66 /// # Errors
67 ///
68 /// Will return `Err` if an IO error occurs.
69 fn flush(
70 &self,
71 _lock: &MutexGuard<'_, ()>,
72 seqno_threshold: SeqNo,
73 ) -> crate::Result<Option<u64>> {
74 use crate::{compaction::stream::CompactionStream, merge::Merger};
75
76 let version_history = self.get_version_history_lock();
77 let latest = version_history.latest_version();
78
79 if latest.sealed_memtables.len() == 0 {
80 return Ok(None);
81 }
82
83 let sealed_ids = latest
84 .sealed_memtables
85 .iter()
86 .map(|mt| mt.id)
87 .collect::<Vec<_>>();
88
89 let flushed_size = latest.sealed_memtables.iter().map(|mt| mt.size()).sum();
90
91 let merger = Merger::new(
92 latest
93 .sealed_memtables
94 .iter()
95 .map(|mt| mt.iter().map(Ok))
96 .collect::<Vec<_>>(),
97 );
98 let stream = CompactionStream::new(merger, seqno_threshold);
99
100 drop(version_history);
101
102 if let Some((tables, blob_files)) = self.flush_to_tables(stream)? {
103 self.register_tables(
104 &tables,
105 blob_files.as_deref(),
106 None,
107 &sealed_ids,
108 seqno_threshold,
109 )?;
110 }
111
112 Ok(Some(flushed_size))
113 }
114
115 /// Returns an iterator that scans through the entire tree.
116 ///
117 /// Avoid using this function, or limit it as otherwise it may scan a lot of items.
118 fn iter(
119 &self,
120 seqno: SeqNo,
121 index: Option<(Arc<Memtable>, SeqNo)>,
122 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
123 self.range::<&[u8], _>(.., seqno, index)
124 }
125
126 /// Returns an iterator over a prefixed set of items.
127 ///
128 /// Avoid using an empty prefix as it may scan a lot of items (unless limited).
129 fn prefix<K: AsRef<[u8]>>(
130 &self,
131 prefix: K,
132 seqno: SeqNo,
133 index: Option<(Arc<Memtable>, SeqNo)>,
134 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static>;
135
136 /// Returns an iterator over a range of items.
137 ///
138 /// Avoid using full or unbounded ranges as they may scan a lot of items (unless limited).
139 fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
140 &self,
141 range: R,
142 seqno: SeqNo,
143 index: Option<(Arc<Memtable>, SeqNo)>,
144 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static>;
145
146 /// Ingests a sorted stream of key-value pairs into the tree.
147 ///
148 /// Can only be called on a new fresh, empty tree.
149 ///
150 /// # Errors
151 ///
152 /// Will return `Err` if an IO error occurs.
153 ///
154 /// # Panics
155 ///
156 /// Panics if the tree is **not** initially empty.
157 ///
158 /// Will panic if the input iterator is not sorted in ascending order.
159 #[doc(hidden)]
160 fn ingest(
161 &self,
162 iter: impl Iterator<Item = (UserKey, UserValue)>,
163 seqno_generator: &SequenceNumberCounter,
164 visible_seqno: &SequenceNumberCounter,
165 ) -> crate::Result<()>;
166
167 /// Returns the approximate number of tombstones in the tree.
168 fn tombstone_count(&self) -> u64;
169
170 /// Returns the approximate number of weak tombstones (single deletes) in the tree.
171 fn weak_tombstone_count(&self) -> u64;
172
173 /// Returns the approximate number of values reclaimable once weak tombstones can be GC'd.
174 fn weak_tombstone_reclaimable_count(&self) -> u64;
175
176 // TODO: clear() with Nuke compaction strategy (write lock) -> drop_range(..)
177
178 /// Drops tables that are fully contained in a given range.
179 ///
180 /// Accepts any `RangeBounds`, including unbounded or exclusive endpoints.
181 /// If the normalized lower bound is greater than the upper bound, the
182 /// method returns without performing any work.
183 ///
184 /// # Errors
185 ///
186 /// Will return `Err` only if an IO error occurs during compaction.
187 fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()>;
188
189 /// Performs major compaction, blocking the caller until it's done.
190 ///
191 /// # Errors
192 ///
193 /// Will return `Err` if an IO error occurs.
194 fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()>;
195
196 /// Returns the disk space used by stale blobs.
197 fn stale_blob_bytes(&self) -> u64 {
198 0
199 }
200
201 /// Gets the space usage of all filters in the tree.
202 ///
203 /// May not correspond to the actual memory size because filter blocks may be paged out.
204 fn filter_size(&self) -> usize;
205
206 /// Gets the memory usage of all pinned filters in the tree.
207 fn pinned_filter_size(&self) -> usize;
208
209 /// Gets the memory usage of all pinned index blocks in the tree.
210 fn pinned_block_index_size(&self) -> usize;
211
212 /// Gets the length of the version free list.
213 fn version_free_list_len(&self) -> usize;
214
215 /// Returns the metrics structure.
216 #[cfg(feature = "metrics")]
217 fn metrics(&self) -> &Arc<crate::Metrics>;
218
219 /// Acquires the flush lock which is required to call [`Tree::flush`].
220 fn get_flush_lock(&self) -> MutexGuard<'_, ()>;
221
222 /// Synchronously flushes a memtable to a table.
223 ///
224 /// This method will not make the table immediately available,
225 /// use [`AbstractTree::register_tables`] for that.
226 ///
227 /// # Errors
228 ///
229 /// Will return `Err` if an IO error occurs.
230 #[warn(clippy::type_complexity)]
231 fn flush_to_tables(
232 &self,
233 stream: impl Iterator<Item = crate::Result<InternalValue>>,
234 ) -> crate::Result<Option<(Vec<Table>, Option<Vec<BlobFile>>)>>;
235
236 /// Atomically registers flushed tables into the tree, removing their associated sealed memtables.
237 ///
238 /// # Errors
239 ///
240 /// Will return `Err` if an IO error occurs.
241 fn register_tables(
242 &self,
243 tables: &[Table],
244 blob_files: Option<&[BlobFile]>,
245 frag_map: Option<crate::blob_tree::FragmentationMap>,
246 sealed_memtables_to_delete: &[crate::tree::inner::MemtableId],
247 gc_watermark: SeqNo,
248 ) -> crate::Result<()>;
249
250 /// Clears the active memtable atomically.
251 fn clear_active_memtable(&self);
252
253 /// Sets the active memtable.
254 ///
255 /// May be used to restore the LSM-tree's in-memory state from a write-ahead log
256 /// after tree recovery.
257 fn set_active_memtable(&self, memtable: Memtable);
258
259 /// Returns the number of sealed memtables.
260 fn sealed_memtable_count(&self) -> usize;
261
262 /// Adds a sealed memtables.
263 ///
264 /// May be used to restore the LSM-tree's in-memory state from some journals.
265 fn add_sealed_memtable(&self, memtable: Arc<Memtable>);
266
267 /// Performs compaction on the tree's levels, blocking the caller until it's done.
268 ///
269 /// # Errors
270 ///
271 /// Will return `Err` if an IO error occurs.
272 fn compact(
273 &self,
274 strategy: Arc<dyn crate::compaction::CompactionStrategy>,
275 seqno_threshold: SeqNo,
276 ) -> crate::Result<()>;
277
278 /// Returns the next table's ID.
279 fn get_next_table_id(&self) -> TableId;
280
281 /// Returns the tree config.
282 fn tree_config(&self) -> &Config;
283
284 /// Returns the highest sequence number.
285 fn get_highest_seqno(&self) -> Option<SeqNo> {
286 let memtable_seqno = self.get_highest_memtable_seqno();
287 let table_seqno = self.get_highest_persisted_seqno();
288 memtable_seqno.max(table_seqno)
289 }
290
291 /// Returns the approximate size of the active memtable in bytes.
292 ///
293 /// May be used to flush the memtable if it grows too large.
294 fn active_memtable_size(&self) -> u64;
295
296 /// Returns the tree type.
297 fn tree_type(&self) -> crate::TreeType;
298
299 /// Seals the active memtable.
300 fn rotate_memtable(&self) -> Option<Arc<Memtable>>;
301
302 /// Returns the number of tables currently in the tree.
303 fn table_count(&self) -> usize;
304
305 /// Returns the number of tables in `levels[idx]`.
306 ///
307 /// Returns `None` if the level does not exist (if idx >= 7).
308 fn level_table_count(&self, idx: usize) -> Option<usize>;
309
310 /// Returns the number of disjoint runs in L0.
311 ///
312 /// Can be used to determine whether to write stall.
313 fn l0_run_count(&self) -> usize;
314
315 /// Returns the number of blob files currently in the tree.
316 fn blob_file_count(&self) -> usize;
317
318 /// Approximates the number of items in the tree.
319 fn approximate_len(&self) -> usize;
320
321 /// Returns the disk space usage.
322 fn disk_space(&self) -> u64;
323
324 /// Returns the highest sequence number of the active memtable.
325 fn get_highest_memtable_seqno(&self) -> Option<SeqNo>;
326
327 /// Returns the highest sequence number that is flushed to disk.
328 fn get_highest_persisted_seqno(&self) -> Option<SeqNo>;
329
330 /// Scans the entire tree, returning the number of items.
331 ///
332 /// ###### Caution
333 ///
334 /// This operation scans the entire tree: O(n) complexity!
335 ///
336 /// Never, under any circumstances, use .`len()` == 0 to check
337 /// if the tree is empty, use [`Tree::is_empty`] instead.
338 ///
339 /// # Examples
340 ///
341 /// ```
342 /// # use lsm_tree::Error as TreeError;
343 /// use lsm_tree::{AbstractTree, Config, Tree};
344 ///
345 /// let folder = tempfile::tempdir()?;
346 /// let tree = Config::new(folder, Default::default()).open()?;
347 ///
348 /// assert_eq!(tree.len(0, None)?, 0);
349 /// tree.insert("1", "abc", 0);
350 /// tree.insert("3", "abc", 1);
351 /// tree.insert("5", "abc", 2);
352 /// assert_eq!(tree.len(3, None)?, 3);
353 /// #
354 /// # Ok::<(), TreeError>(())
355 /// ```
356 ///
357 /// # Errors
358 ///
359 /// Will return `Err` if an IO error occurs.
360 fn len(&self, seqno: SeqNo, index: Option<(Arc<Memtable>, SeqNo)>) -> crate::Result<usize> {
361 let mut count = 0;
362
363 for item in self.iter(seqno, index) {
364 let _ = item.key()?;
365 count += 1;
366 }
367
368 Ok(count)
369 }
370
371 /// Returns `true` if the tree is empty.
372 ///
373 /// This operation has O(log N) complexity.
374 ///
375 /// # Examples
376 ///
377 /// ```
378 /// # let folder = tempfile::tempdir()?;
379 /// use lsm_tree::{AbstractTree, Config, Tree};
380 ///
381 /// let tree = Config::new(folder, Default::default()).open()?;
382 /// assert!(tree.is_empty(0, None)?);
383 ///
384 /// tree.insert("a", "abc", 0);
385 /// assert!(!tree.is_empty(1, None)?);
386 /// #
387 /// # Ok::<(), lsm_tree::Error>(())
388 /// ```
389 ///
390 /// # Errors
391 ///
392 /// Will return `Err` if an IO error occurs.
393 fn is_empty(&self, seqno: SeqNo, index: Option<(Arc<Memtable>, SeqNo)>) -> crate::Result<bool> {
394 self.first_key_value(seqno, index).map(|x| x.is_none())
395 }
396
397 /// Returns the first key-value pair in the tree.
398 /// The key in this pair is the minimum key in the tree.
399 ///
400 /// # Examples
401 ///
402 /// ```
403 /// # use lsm_tree::Error as TreeError;
404 /// # use lsm_tree::{AbstractTree, Config, Tree};
405 /// #
406 /// # let folder = tempfile::tempdir()?;
407 /// let tree = Config::new(folder, Default::default()).open()?;
408 ///
409 /// tree.insert("1", "abc", 0);
410 /// tree.insert("3", "abc", 1);
411 /// tree.insert("5", "abc", 2);
412 ///
413 /// let (key, _) = tree.first_key_value(3, None)?.expect("item should exist");
414 /// assert_eq!(&*key, "1".as_bytes());
415 /// #
416 /// # Ok::<(), TreeError>(())
417 /// ```
418 ///
419 /// # Errors
420 ///
421 /// Will return `Err` if an IO error occurs.
422 fn first_key_value(
423 &self,
424 seqno: SeqNo,
425 index: Option<(Arc<Memtable>, SeqNo)>,
426 ) -> crate::Result<Option<KvPair>> {
427 self.iter(seqno, index)
428 .next()
429 .map(Guard::into_inner)
430 .transpose()
431 }
432
433 /// Returns the last key-value pair in the tree.
434 /// The key in this pair is the maximum key in the tree.
435 ///
436 /// # Examples
437 ///
438 /// ```
439 /// # use lsm_tree::Error as TreeError;
440 /// # use lsm_tree::{AbstractTree, Config, Tree};
441 /// #
442 /// # let folder = tempfile::tempdir()?;
443 /// # let tree = Config::new(folder, Default::default()).open()?;
444 /// #
445 /// tree.insert("1", "abc", 0);
446 /// tree.insert("3", "abc", 1);
447 /// tree.insert("5", "abc", 2);
448 ///
449 /// let (key, _) = tree.last_key_value(3, None)?.expect("item should exist");
450 /// assert_eq!(&*key, "5".as_bytes());
451 /// #
452 /// # Ok::<(), TreeError>(())
453 /// ```
454 ///
455 /// # Errors
456 ///
457 /// Will return `Err` if an IO error occurs.
458 fn last_key_value(
459 &self,
460 seqno: SeqNo,
461 index: Option<(Arc<Memtable>, SeqNo)>,
462 ) -> crate::Result<Option<KvPair>> {
463 self.iter(seqno, index)
464 .next_back()
465 .map(Guard::into_inner)
466 .transpose()
467 }
468
469 /// Returns the size of a value if it exists.
470 ///
471 /// # Examples
472 ///
473 /// ```
474 /// # let folder = tempfile::tempdir()?;
475 /// use lsm_tree::{AbstractTree, Config, Tree};
476 ///
477 /// let tree = Config::new(folder, Default::default()).open()?;
478 /// tree.insert("a", "my_value", 0);
479 ///
480 /// let size = tree.size_of("a", 1)?.unwrap_or_default();
481 /// assert_eq!("my_value".len() as u32, size);
482 ///
483 /// let size = tree.size_of("b", 1)?.unwrap_or_default();
484 /// assert_eq!(0, size);
485 /// #
486 /// # Ok::<(), lsm_tree::Error>(())
487 /// ```
488 ///
489 /// # Errors
490 ///
491 /// Will return `Err` if an IO error occurs.
492 fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>>;
493
494 /// Retrieves an item from the tree.
495 ///
496 /// # Examples
497 ///
498 /// ```
499 /// # let folder = tempfile::tempdir()?;
500 /// use lsm_tree::{AbstractTree, Config, Tree};
501 ///
502 /// let tree = Config::new(folder, Default::default()).open()?;
503 /// tree.insert("a", "my_value", 0);
504 ///
505 /// let item = tree.get("a", 1)?;
506 /// assert_eq!(Some("my_value".as_bytes().into()), item);
507 /// #
508 /// # Ok::<(), lsm_tree::Error>(())
509 /// ```
510 ///
511 /// # Errors
512 ///
513 /// Will return `Err` if an IO error occurs.
514 fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<UserValue>>;
515
516 /// Returns `true` if the tree contains the specified key.
517 ///
518 /// # Examples
519 ///
520 /// ```
521 /// # let folder = tempfile::tempdir()?;
522 /// # use lsm_tree::{AbstractTree, Config, Tree};
523 /// #
524 /// let tree = Config::new(folder, Default::default()).open()?;
525 /// assert!(!tree.contains_key("a", 0)?);
526 ///
527 /// tree.insert("a", "abc", 0);
528 /// assert!(tree.contains_key("a", 1)?);
529 /// #
530 /// # Ok::<(), lsm_tree::Error>(())
531 /// ```
532 ///
533 /// # Errors
534 ///
535 /// Will return `Err` if an IO error occurs.
536 fn contains_key<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<bool> {
537 self.get(key, seqno).map(|x| x.is_some())
538 }
539
540 /// Inserts a key-value pair into the tree.
541 ///
542 /// If the key already exists, the item will be overwritten.
543 ///
544 /// Returns the added item's size and new size of the memtable.
545 ///
546 /// # Examples
547 ///
548 /// ```
549 /// # let folder = tempfile::tempdir()?;
550 /// use lsm_tree::{AbstractTree, Config, Tree};
551 ///
552 /// let tree = Config::new(folder, Default::default()).open()?;
553 /// tree.insert("a", "abc", 0);
554 /// #
555 /// # Ok::<(), lsm_tree::Error>(())
556 /// ```
557 ///
558 /// # Errors
559 ///
560 /// Will return `Err` if an IO error occurs.
561 fn insert<K: Into<UserKey>, V: Into<UserValue>>(
562 &self,
563 key: K,
564 value: V,
565 seqno: SeqNo,
566 ) -> (u64, u64);
567
568 /// Removes an item from the tree.
569 ///
570 /// Returns the added item's size and new size of the memtable.
571 ///
572 /// # Examples
573 ///
574 /// ```
575 /// # let folder = tempfile::tempdir()?;
576 /// # use lsm_tree::{AbstractTree, Config, Tree};
577 /// #
578 /// # let tree = Config::new(folder, Default::default()).open()?;
579 /// tree.insert("a", "abc", 0);
580 ///
581 /// let item = tree.get("a", 1)?.expect("should have item");
582 /// assert_eq!("abc".as_bytes(), &*item);
583 ///
584 /// tree.remove("a", 1);
585 ///
586 /// let item = tree.get("a", 2)?;
587 /// assert_eq!(None, item);
588 /// #
589 /// # Ok::<(), lsm_tree::Error>(())
590 /// ```
591 ///
592 /// # Errors
593 ///
594 /// Will return `Err` if an IO error occurs.
595 fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64);
596
597 /// Removes an item from the tree.
598 ///
599 /// The tombstone marker of this delete operation will vanish when it
600 /// collides with its corresponding insertion.
601 /// This may cause older versions of the value to be resurrected, so it should
602 /// only be used and preferred in scenarios where a key is only ever written once.
603 ///
604 /// Returns the added item's size and new size of the memtable.
605 ///
606 /// # Examples
607 ///
608 /// ```
609 /// # let folder = tempfile::tempdir()?;
610 /// # use lsm_tree::{AbstractTree, Config, Tree};
611 /// #
612 /// # let tree = Config::new(folder, Default::default()).open()?;
613 /// tree.insert("a", "abc", 0);
614 ///
615 /// let item = tree.get("a", 1)?.expect("should have item");
616 /// assert_eq!("abc".as_bytes(), &*item);
617 ///
618 /// tree.remove_weak("a", 1);
619 ///
620 /// let item = tree.get("a", 2)?;
621 /// assert_eq!(None, item);
622 /// #
623 /// # Ok::<(), lsm_tree::Error>(())
624 /// ```
625 ///
626 /// # Errors
627 ///
628 /// Will return `Err` if an IO error occurs.
629 #[doc(hidden)]
630 fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64);
631}