lsm_tree/abstract.rs
1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use crate::{
6 blob_tree::FragmentationMap,
7 compaction::CompactionStrategy,
8 config::TreeType,
9 iter_guard::IterGuardImpl,
10 table::Table,
11 tree::inner::MemtableId,
12 version::{SuperVersions, Version},
13 vlog::BlobFile,
14 AnyTree, BlobTree, Config, Guard, InternalValue, KvPair, Memtable, SeqNo,
15 SequenceNumberCounter, TableId, Tree, TreeId, UserKey, UserValue,
16};
17use enum_dispatch::enum_dispatch;
18use std::{
19 ops::RangeBounds,
20 sync::{Arc, MutexGuard, RwLockWriteGuard},
21};
22
23pub type RangeItem = crate::Result<KvPair>;
24
25/// Generic Tree API
26#[enum_dispatch]
27pub trait AbstractTree {
28 #[doc(hidden)]
29 fn next_table_id(&self) -> TableId;
30
31 #[doc(hidden)]
32 fn id(&self) -> TreeId;
33
34 #[doc(hidden)]
35 fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>>;
36
37 #[doc(hidden)]
38 fn current_version(&self) -> Version;
39
40 #[doc(hidden)]
41 fn get_version_history_lock(&self) -> RwLockWriteGuard<'_, SuperVersions>;
42
43 /// Seals the active memtable and flushes to table(s).
44 ///
45 /// If there are already other sealed memtables lined up, those will be flushed as well.
46 ///
47 /// Only used in tests.
48 #[doc(hidden)]
49 fn flush_active_memtable(&self, eviction_seqno: SeqNo) -> crate::Result<()> {
50 let lock = self.get_flush_lock();
51 self.rotate_memtable();
52 self.flush(&lock, eviction_seqno)?;
53 Ok(())
54 }
55
56 /// Synchronously flushes pending sealed memtables to tables.
57 ///
58 /// Returns the sum of flushed memtable sizes that were flushed.
59 ///
60 /// The function may not return a result, if nothing was flushed.
61 ///
62 /// # Errors
63 ///
64 /// Will return `Err` if an IO error occurs.
65 fn flush(
66 &self,
67 _lock: &MutexGuard<'_, ()>,
68 seqno_threshold: SeqNo,
69 ) -> crate::Result<Option<u64>> {
70 use crate::{compaction::stream::CompactionStream, merge::Merger};
71
72 let version_history = self.get_version_history_lock();
73 let latest = version_history.latest_version();
74
75 if latest.sealed_memtables.len() == 0 {
76 return Ok(None);
77 }
78
79 let sealed_ids = latest
80 .sealed_memtables
81 .iter()
82 .map(|mt| mt.0)
83 .collect::<Vec<_>>();
84
85 let flushed_size = latest.sealed_memtables.iter().map(|(_, x)| x.size()).sum();
86
87 let merger = Merger::new(
88 latest
89 .sealed_memtables
90 .iter()
91 .map(|(_, mt)| mt.iter().map(Ok))
92 .collect::<Vec<_>>(),
93 );
94 let stream = CompactionStream::new(merger, seqno_threshold);
95
96 drop(version_history);
97
98 if let Some((tables, blob_files)) = self.flush_to_tables(stream)? {
99 self.register_tables(
100 &tables,
101 blob_files.as_deref(),
102 None,
103 &sealed_ids,
104 seqno_threshold,
105 )?;
106 }
107
108 Ok(Some(flushed_size))
109 }
110
111 /// Returns an iterator that scans through the entire tree.
112 ///
113 /// Avoid using this function, or limit it as otherwise it may scan a lot of items.
114 fn iter(
115 &self,
116 seqno: SeqNo,
117 index: Option<Arc<Memtable>>,
118 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
119 self.range::<&[u8], _>(.., seqno, index)
120 }
121
122 /// Returns an iterator over a prefixed set of items.
123 ///
124 /// Avoid using an empty prefix as it may scan a lot of items (unless limited).
125 fn prefix<K: AsRef<[u8]>>(
126 &self,
127 prefix: K,
128 seqno: SeqNo,
129 index: Option<Arc<Memtable>>,
130 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static>;
131
132 /// Returns an iterator over a range of items.
133 ///
134 /// Avoid using full or unbounded ranges as they may scan a lot of items (unless limited).
135 fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
136 &self,
137 range: R,
138 seqno: SeqNo,
139 index: Option<Arc<Memtable>>,
140 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static>;
141
142 /// Ingests a sorted stream of key-value pairs into the tree.
143 ///
144 /// Can only be called on a new fresh, empty tree.
145 ///
146 /// # Errors
147 ///
148 /// Will return `Err` if an IO error occurs.
149 ///
150 /// # Panics
151 ///
152 /// Panics if the tree is **not** initially empty.
153 ///
154 /// Will panic if the input iterator is not sorted in ascending order.
155 #[doc(hidden)]
156 fn ingest(
157 &self,
158 iter: impl Iterator<Item = (UserKey, UserValue)>,
159 seqno_generator: &SequenceNumberCounter,
160 visible_seqno: &SequenceNumberCounter,
161 ) -> crate::Result<()>;
162
163 /// Returns the approximate number of tombstones in the tree.
164 fn tombstone_count(&self) -> u64;
165
166 /// Returns the approximate number of weak tombstones (single deletes) in the tree.
167 fn weak_tombstone_count(&self) -> u64;
168
169 /// Returns the approximate number of values reclaimable once weak tombstones can be GC'd.
170 fn weak_tombstone_reclaimable_count(&self) -> u64;
171
172 // TODO: clear() with Nuke compaction strategy (write lock) -> drop_range(..)
173
174 /// Drops tables that are fully contained in a given range.
175 ///
176 /// Accepts any `RangeBounds`, including unbounded or exclusive endpoints.
177 /// If the normalized lower bound is greater than the upper bound, the
178 /// method returns without performing any work.
179 ///
180 /// # Errors
181 ///
182 /// Will return `Err` only if an IO error occurs during compaction.
183 fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()>;
184
185 /// Performs major compaction, blocking the caller until it's done.
186 ///
187 /// # Errors
188 ///
189 /// Will return `Err` if an IO error occurs.
190 fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()>;
191
192 /// Returns the disk space used by stale blobs.
193 fn stale_blob_bytes(&self) -> u64 {
194 0
195 }
196
197 /// Gets the space usage of all filters in the tree.
198 ///
199 /// May not correspond to the actual memory size because filter blocks may be paged out.
200 fn filter_size(&self) -> usize;
201
202 /// Gets the memory usage of all pinned filters in the tree.
203 fn pinned_filter_size(&self) -> usize;
204
205 /// Gets the memory usage of all pinned index blocks in the tree.
206 fn pinned_block_index_size(&self) -> usize;
207
208 /// Gets the length of the version free list.
209 fn version_free_list_len(&self) -> usize;
210
211 /// Returns the metrics structure.
212 #[cfg(feature = "metrics")]
213 fn metrics(&self) -> &Arc<crate::Metrics>;
214
215 /// Acquires the flush lock which is required to call [`Tree::flush`].
216 fn get_flush_lock(&self) -> MutexGuard<'_, ()>;
217
218 /// Synchronously flushes a memtable to a table.
219 ///
220 /// This method will not make the table immediately available,
221 /// use [`AbstractTree::register_tables`] for that.
222 ///
223 /// # Errors
224 ///
225 /// Will return `Err` if an IO error occurs.
226 #[warn(clippy::type_complexity)]
227 fn flush_to_tables(
228 &self,
229 stream: impl Iterator<Item = crate::Result<InternalValue>>,
230 ) -> crate::Result<Option<(Vec<Table>, Option<Vec<BlobFile>>)>>;
231
232 /// Atomically registers flushed tables into the tree, removing their associated sealed memtables.
233 ///
234 /// # Errors
235 ///
236 /// Will return `Err` if an IO error occurs.
237 fn register_tables(
238 &self,
239 tables: &[Table],
240 blob_files: Option<&[BlobFile]>,
241 frag_map: Option<FragmentationMap>,
242 sealed_memtables_to_delete: &[MemtableId],
243 gc_watermark: SeqNo,
244 ) -> crate::Result<()>;
245
246 /// Clears the active memtable atomically.
247 fn clear_active_memtable(&self);
248
249 /// Sets the active memtable.
250 ///
251 /// May be used to restore the LSM-tree's in-memory state from a write-ahead log
252 /// after tree recovery.
253 fn set_active_memtable(&self, memtable: Memtable);
254
255 /// Returns the number of sealed memtables.
256 fn sealed_memtable_count(&self) -> usize;
257
258 /// Adds a sealed memtables.
259 ///
260 /// May be used to restore the LSM-tree's in-memory state from some journals.
261 fn add_sealed_memtable(&self, id: MemtableId, memtable: Arc<Memtable>);
262
263 /// Performs compaction on the tree's levels, blocking the caller until it's done.
264 ///
265 /// # Errors
266 ///
267 /// Will return `Err` if an IO error occurs.
268 fn compact(
269 &self,
270 strategy: Arc<dyn CompactionStrategy>,
271 seqno_threshold: SeqNo,
272 ) -> crate::Result<()>;
273
274 /// Returns the next table's ID.
275 fn get_next_table_id(&self) -> TableId;
276
277 /// Returns the tree config.
278 fn tree_config(&self) -> &Config;
279
280 /// Returns the highest sequence number.
281 fn get_highest_seqno(&self) -> Option<SeqNo> {
282 let memtable_seqno = self.get_highest_memtable_seqno();
283 let table_seqno = self.get_highest_persisted_seqno();
284 memtable_seqno.max(table_seqno)
285 }
286
287 /// Returns the approximate size of the active memtable in bytes.
288 ///
289 /// May be used to flush the memtable if it grows too large.
290 fn active_memtable_size(&self) -> u64;
291
292 /// Returns the tree type.
293 fn tree_type(&self) -> TreeType;
294
295 /// Seals the active memtable.
296 fn rotate_memtable(&self) -> Option<Arc<Memtable>>;
297
298 /// Returns the number of tables currently in the tree.
299 fn table_count(&self) -> usize;
300
301 /// Returns the number of tables in `levels[idx]`.
302 ///
303 /// Returns `None` if the level does not exist (if idx >= 7).
304 fn level_table_count(&self, idx: usize) -> Option<usize>;
305
306 /// Returns the number of disjoint runs in L0.
307 ///
308 /// Can be used to determine whether to write stall.
309 fn l0_run_count(&self) -> usize;
310
311 /// Returns the number of blob files currently in the tree.
312 fn blob_file_count(&self) -> usize;
313
314 /// Approximates the number of items in the tree.
315 fn approximate_len(&self) -> usize;
316
317 /// Returns the disk space usage.
318 fn disk_space(&self) -> u64;
319
320 /// Returns the highest sequence number of the active memtable.
321 fn get_highest_memtable_seqno(&self) -> Option<SeqNo>;
322
323 /// Returns the highest sequence number that is flushed to disk.
324 fn get_highest_persisted_seqno(&self) -> Option<SeqNo>;
325
326 /// Scans the entire tree, returning the number of items.
327 ///
328 /// ###### Caution
329 ///
330 /// This operation scans the entire tree: O(n) complexity!
331 ///
332 /// Never, under any circumstances, use .`len()` == 0 to check
333 /// if the tree is empty, use [`Tree::is_empty`] instead.
334 ///
335 /// # Examples
336 ///
337 /// ```
338 /// # use lsm_tree::Error as TreeError;
339 /// use lsm_tree::{AbstractTree, Config, Tree};
340 ///
341 /// let folder = tempfile::tempdir()?;
342 /// let tree = Config::new(folder, Default::default()).open()?;
343 ///
344 /// assert_eq!(tree.len(0, None)?, 0);
345 /// tree.insert("1", "abc", 0);
346 /// tree.insert("3", "abc", 1);
347 /// tree.insert("5", "abc", 2);
348 /// assert_eq!(tree.len(3, None)?, 3);
349 /// #
350 /// # Ok::<(), TreeError>(())
351 /// ```
352 ///
353 /// # Errors
354 ///
355 /// Will return `Err` if an IO error occurs.
356 fn len(&self, seqno: SeqNo, index: Option<Arc<Memtable>>) -> crate::Result<usize> {
357 let mut count = 0;
358
359 for item in self.iter(seqno, index) {
360 let _ = item.key()?;
361 count += 1;
362 }
363
364 Ok(count)
365 }
366
367 /// Returns `true` if the tree is empty.
368 ///
369 /// This operation has O(log N) complexity.
370 ///
371 /// # Examples
372 ///
373 /// ```
374 /// # let folder = tempfile::tempdir()?;
375 /// use lsm_tree::{AbstractTree, Config, Tree};
376 ///
377 /// let tree = Config::new(folder, Default::default()).open()?;
378 /// assert!(tree.is_empty(0, None)?);
379 ///
380 /// tree.insert("a", "abc", 0);
381 /// assert!(!tree.is_empty(1, None)?);
382 /// #
383 /// # Ok::<(), lsm_tree::Error>(())
384 /// ```
385 ///
386 /// # Errors
387 ///
388 /// Will return `Err` if an IO error occurs.
389 fn is_empty(&self, seqno: SeqNo, index: Option<Arc<Memtable>>) -> crate::Result<bool> {
390 self.first_key_value(seqno, index).map(|x| x.is_none())
391 }
392
393 /// Returns the first key-value pair in the tree.
394 /// The key in this pair is the minimum key in the tree.
395 ///
396 /// # Examples
397 ///
398 /// ```
399 /// # use lsm_tree::Error as TreeError;
400 /// # use lsm_tree::{AbstractTree, Config, Tree};
401 /// #
402 /// # let folder = tempfile::tempdir()?;
403 /// let tree = Config::new(folder, Default::default()).open()?;
404 ///
405 /// tree.insert("1", "abc", 0);
406 /// tree.insert("3", "abc", 1);
407 /// tree.insert("5", "abc", 2);
408 ///
409 /// let (key, _) = tree.first_key_value(3, None)?.expect("item should exist");
410 /// assert_eq!(&*key, "1".as_bytes());
411 /// #
412 /// # Ok::<(), TreeError>(())
413 /// ```
414 ///
415 /// # Errors
416 ///
417 /// Will return `Err` if an IO error occurs.
418 fn first_key_value(
419 &self,
420 seqno: SeqNo,
421 index: Option<Arc<Memtable>>,
422 ) -> crate::Result<Option<KvPair>> {
423 self.iter(seqno, index)
424 .next()
425 .map(Guard::into_inner)
426 .transpose()
427 }
428
429 /// Returns the last key-value pair in the tree.
430 /// The key in this pair is the maximum key in the tree.
431 ///
432 /// # Examples
433 ///
434 /// ```
435 /// # use lsm_tree::Error as TreeError;
436 /// # use lsm_tree::{AbstractTree, Config, Tree};
437 /// #
438 /// # let folder = tempfile::tempdir()?;
439 /// # let tree = Config::new(folder, Default::default()).open()?;
440 /// #
441 /// tree.insert("1", "abc", 0);
442 /// tree.insert("3", "abc", 1);
443 /// tree.insert("5", "abc", 2);
444 ///
445 /// let (key, _) = tree.last_key_value(3, None)?.expect("item should exist");
446 /// assert_eq!(&*key, "5".as_bytes());
447 /// #
448 /// # Ok::<(), TreeError>(())
449 /// ```
450 ///
451 /// # Errors
452 ///
453 /// Will return `Err` if an IO error occurs.
454 fn last_key_value(
455 &self,
456 seqno: SeqNo,
457 index: Option<Arc<Memtable>>,
458 ) -> crate::Result<Option<KvPair>> {
459 self.iter(seqno, index)
460 .next_back()
461 .map(Guard::into_inner)
462 .transpose()
463 }
464
465 /// Returns the size of a value if it exists.
466 ///
467 /// # Examples
468 ///
469 /// ```
470 /// # let folder = tempfile::tempdir()?;
471 /// use lsm_tree::{AbstractTree, Config, Tree};
472 ///
473 /// let tree = Config::new(folder, Default::default()).open()?;
474 /// tree.insert("a", "my_value", 0);
475 ///
476 /// let size = tree.size_of("a", 1)?.unwrap_or_default();
477 /// assert_eq!("my_value".len() as u32, size);
478 ///
479 /// let size = tree.size_of("b", 1)?.unwrap_or_default();
480 /// assert_eq!(0, size);
481 /// #
482 /// # Ok::<(), lsm_tree::Error>(())
483 /// ```
484 ///
485 /// # Errors
486 ///
487 /// Will return `Err` if an IO error occurs.
488 fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>>;
489
490 /// Retrieves an item from the tree.
491 ///
492 /// # Examples
493 ///
494 /// ```
495 /// # let folder = tempfile::tempdir()?;
496 /// use lsm_tree::{AbstractTree, Config, Tree};
497 ///
498 /// let tree = Config::new(folder, Default::default()).open()?;
499 /// tree.insert("a", "my_value", 0);
500 ///
501 /// let item = tree.get("a", 1)?;
502 /// assert_eq!(Some("my_value".as_bytes().into()), item);
503 /// #
504 /// # Ok::<(), lsm_tree::Error>(())
505 /// ```
506 ///
507 /// # Errors
508 ///
509 /// Will return `Err` if an IO error occurs.
510 fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<UserValue>>;
511
512 /// Returns `true` if the tree contains the specified key.
513 ///
514 /// # Examples
515 ///
516 /// ```
517 /// # let folder = tempfile::tempdir()?;
518 /// # use lsm_tree::{AbstractTree, Config, Tree};
519 /// #
520 /// let tree = Config::new(folder, Default::default()).open()?;
521 /// assert!(!tree.contains_key("a", 0)?);
522 ///
523 /// tree.insert("a", "abc", 0);
524 /// assert!(tree.contains_key("a", 1)?);
525 /// #
526 /// # Ok::<(), lsm_tree::Error>(())
527 /// ```
528 ///
529 /// # Errors
530 ///
531 /// Will return `Err` if an IO error occurs.
532 fn contains_key<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<bool> {
533 self.get(key, seqno).map(|x| x.is_some())
534 }
535
536 /// Inserts a key-value pair into the tree.
537 ///
538 /// If the key already exists, the item will be overwritten.
539 ///
540 /// Returns the added item's size and new size of the memtable.
541 ///
542 /// # Examples
543 ///
544 /// ```
545 /// # let folder = tempfile::tempdir()?;
546 /// use lsm_tree::{AbstractTree, Config, Tree};
547 ///
548 /// let tree = Config::new(folder, Default::default()).open()?;
549 /// tree.insert("a", "abc", 0);
550 /// #
551 /// # Ok::<(), lsm_tree::Error>(())
552 /// ```
553 ///
554 /// # Errors
555 ///
556 /// Will return `Err` if an IO error occurs.
557 fn insert<K: Into<UserKey>, V: Into<UserValue>>(
558 &self,
559 key: K,
560 value: V,
561 seqno: SeqNo,
562 ) -> (u64, u64);
563
564 /// Removes an item from the tree.
565 ///
566 /// Returns the added item's size and new size of the memtable.
567 ///
568 /// # Examples
569 ///
570 /// ```
571 /// # let folder = tempfile::tempdir()?;
572 /// # use lsm_tree::{AbstractTree, Config, Tree};
573 /// #
574 /// # let tree = Config::new(folder, Default::default()).open()?;
575 /// tree.insert("a", "abc", 0);
576 ///
577 /// let item = tree.get("a", 1)?.expect("should have item");
578 /// assert_eq!("abc".as_bytes(), &*item);
579 ///
580 /// tree.remove("a", 1);
581 ///
582 /// let item = tree.get("a", 2)?;
583 /// assert_eq!(None, item);
584 /// #
585 /// # Ok::<(), lsm_tree::Error>(())
586 /// ```
587 ///
588 /// # Errors
589 ///
590 /// Will return `Err` if an IO error occurs.
591 fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64);
592
593 /// Removes an item from the tree.
594 ///
595 /// The tombstone marker of this delete operation will vanish when it
596 /// collides with its corresponding insertion.
597 /// This may cause older versions of the value to be resurrected, so it should
598 /// only be used and preferred in scenarios where a key is only ever written once.
599 ///
600 /// Returns the added item's size and new size of the memtable.
601 ///
602 /// # Examples
603 ///
604 /// ```
605 /// # let folder = tempfile::tempdir()?;
606 /// # use lsm_tree::{AbstractTree, Config, Tree};
607 /// #
608 /// # let tree = Config::new(folder, Default::default()).open()?;
609 /// tree.insert("a", "abc", 0);
610 ///
611 /// let item = tree.get("a", 1)?.expect("should have item");
612 /// assert_eq!("abc".as_bytes(), &*item);
613 ///
614 /// tree.remove_weak("a", 1);
615 ///
616 /// let item = tree.get("a", 2)?;
617 /// assert_eq!(None, item);
618 /// #
619 /// # Ok::<(), lsm_tree::Error>(())
620 /// ```
621 ///
622 /// # Errors
623 ///
624 /// Will return `Err` if an IO error occurs.
625 #[doc(hidden)]
626 fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64);
627}