lsm_tree/abstract_tree.rs
1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use crate::{
6 iter_guard::IterGuardImpl, table::Table, version::Version, vlog::BlobFile, AnyTree, BlobTree,
7 Config, Guard, InternalValue, KvPair, Memtable, SeqNo, TableId, Tree, UserKey, UserValue,
8};
9use std::{
10 ops::RangeBounds,
11 sync::{Arc, MutexGuard, RwLockWriteGuard},
12};
13
14pub type RangeItem = crate::Result<KvPair>;
15
16type FlushToTablesResult = (Vec<Table>, Option<Vec<BlobFile>>);
17
18/// Generic Tree API
19#[enum_dispatch::enum_dispatch]
20pub trait AbstractTree {
21 /// Debug method for tracing the MVCC history of a key.
22 #[doc(hidden)]
23 fn print_trace(&self, key: &[u8]) -> crate::Result<()>;
24
25 /// Returns the number of cached table file descriptors.
26 fn table_file_cache_size(&self) -> usize;
27
28 // TODO: remove
29 #[doc(hidden)]
30 fn version_memtable_size_sum(&self) -> u64 {
31 self.get_version_history_lock().memtable_size_sum()
32 }
33
34 #[doc(hidden)]
35 fn next_table_id(&self) -> TableId;
36
37 #[doc(hidden)]
38 fn id(&self) -> crate::TreeId;
39
40 /// Like [`AbstractTree::get`], but returns the actual internal entry, not just the user value.
41 ///
42 /// Used in tests.
43 #[doc(hidden)]
44 fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>>;
45
46 #[doc(hidden)]
47 fn current_version(&self) -> Version;
48
49 #[doc(hidden)]
50 fn get_version_history_lock(&self) -> RwLockWriteGuard<'_, crate::version::SuperVersions>;
51
52 /// Seals the active memtable and flushes to table(s).
53 ///
54 /// If there are already other sealed memtables lined up, those will be flushed as well.
55 ///
56 /// Only used in tests.
57 #[doc(hidden)]
58 fn flush_active_memtable(&self, eviction_seqno: SeqNo) -> crate::Result<()> {
59 let lock = self.get_flush_lock();
60 self.rotate_memtable();
61 self.flush(&lock, eviction_seqno)?;
62 Ok(())
63 }
64
65 /// Synchronously flushes pending sealed memtables to tables.
66 ///
67 /// Returns the sum of flushed memtable sizes that were flushed.
68 ///
69 /// The function may not return a result, if nothing was flushed.
70 ///
71 /// # Errors
72 ///
73 /// Will return `Err` if an IO error occurs.
74 fn flush(
75 &self,
76 _lock: &MutexGuard<'_, ()>,
77 seqno_threshold: SeqNo,
78 ) -> crate::Result<Option<u64>> {
79 use crate::{compaction::stream::CompactionStream, merge::Merger};
80
81 let version_history = self.get_version_history_lock();
82 let latest = version_history.latest_version();
83
84 if latest.sealed_memtables.len() == 0 {
85 return Ok(None);
86 }
87
88 let sealed_ids = latest
89 .sealed_memtables
90 .iter()
91 .map(|mt| mt.id)
92 .collect::<Vec<_>>();
93
94 let flushed_size = latest.sealed_memtables.iter().map(|mt| mt.size()).sum();
95
96 let merger = Merger::new(
97 latest
98 .sealed_memtables
99 .iter()
100 .map(|mt| mt.iter().map(Ok))
101 .collect::<Vec<_>>(),
102 );
103 let stream = CompactionStream::new(merger, seqno_threshold);
104
105 drop(version_history);
106
107 if let Some((tables, blob_files)) = self.flush_to_tables(stream)? {
108 self.register_tables(
109 &tables,
110 blob_files.as_deref(),
111 None,
112 &sealed_ids,
113 seqno_threshold,
114 )?;
115 }
116
117 Ok(Some(flushed_size))
118 }
119
120 /// Returns an iterator that scans through the entire tree.
121 ///
122 /// Avoid using this function, or limit it as otherwise it may scan a lot of items.
123 fn iter(
124 &self,
125 seqno: SeqNo,
126 index: Option<(Arc<Memtable>, SeqNo)>,
127 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
128 self.range::<&[u8], _>(.., seqno, index)
129 }
130
131 /// Returns an iterator over a prefixed set of items.
132 ///
133 /// Avoid using an empty prefix as it may scan a lot of items (unless limited).
134 fn prefix<K: AsRef<[u8]>>(
135 &self,
136 prefix: K,
137 seqno: SeqNo,
138 index: Option<(Arc<Memtable>, SeqNo)>,
139 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static>;
140
141 /// Returns an iterator over a range of items.
142 ///
143 /// Avoid using full or unbounded ranges as they may scan a lot of items (unless limited).
144 fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
145 &self,
146 range: R,
147 seqno: SeqNo,
148 index: Option<(Arc<Memtable>, SeqNo)>,
149 ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static>;
150
151 /// Returns the approximate number of tombstones in the tree.
152 fn tombstone_count(&self) -> u64;
153
154 /// Returns the approximate number of weak tombstones (single deletes) in the tree.
155 fn weak_tombstone_count(&self) -> u64;
156
157 /// Returns the approximate number of values reclaimable once weak tombstones can be GC'd.
158 fn weak_tombstone_reclaimable_count(&self) -> u64;
159
160 /// Drops tables that are fully contained in a given range.
161 ///
162 /// Accepts any `RangeBounds`, including unbounded or exclusive endpoints.
163 /// If the normalized lower bound is greater than the upper bound, the
164 /// method returns without performing any work.
165 ///
166 /// # Errors
167 ///
168 /// Will return `Err` only if an IO error occurs.
169 fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()>;
170
171 /// Drops all tables and clears all memtables atomically.
172 ///
173 /// # Errors
174 ///
175 /// Will return `Err` only if an IO error occurs.
176 fn clear(&self) -> crate::Result<()>;
177
178 /// Performs major compaction, blocking the caller until it's done.
179 ///
180 /// # Errors
181 ///
182 /// Will return `Err` if an IO error occurs.
183 fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()>;
184
185 /// Returns the disk space used by stale blobs.
186 fn stale_blob_bytes(&self) -> u64 {
187 0
188 }
189
190 /// Gets the disk space usage of all filters in the tree.
191 ///
192 /// May not correspond to the actual memory size because filter blocks may be paged out.
193 fn filter_size(&self) -> u64;
194
195 /// Gets the memory usage of all pinned filters in the tree.
196 fn pinned_filter_size(&self) -> usize;
197
198 /// Gets the memory usage of all pinned index blocks in the tree.
199 fn pinned_block_index_size(&self) -> usize;
200
201 /// Gets the length of the version free list.
202 fn version_free_list_len(&self) -> usize;
203
204 /// Returns the metrics structure.
205 #[cfg(feature = "metrics")]
206 fn metrics(&self) -> &Arc<crate::Metrics>;
207
208 /// Acquires the flush lock which is required to call [`Tree::flush`].
209 fn get_flush_lock(&self) -> MutexGuard<'_, ()>;
210
211 /// Synchronously flushes a memtable to a table.
212 ///
213 /// This method will not make the table immediately available,
214 /// use [`AbstractTree::register_tables`] for that.
215 ///
216 /// # Errors
217 ///
218 /// Will return `Err` if an IO error occurs.
219 #[warn(clippy::type_complexity)]
220 fn flush_to_tables(
221 &self,
222 stream: impl Iterator<Item = crate::Result<InternalValue>>,
223 ) -> crate::Result<Option<FlushToTablesResult>>;
224
225 /// Atomically registers flushed tables into the tree, removing their associated sealed memtables.
226 ///
227 /// # Errors
228 ///
229 /// Will return `Err` if an IO error occurs.
230 fn register_tables(
231 &self,
232 tables: &[Table],
233 blob_files: Option<&[BlobFile]>,
234 frag_map: Option<crate::blob_tree::FragmentationMap>,
235 sealed_memtables_to_delete: &[crate::tree::inner::MemtableId],
236 gc_watermark: SeqNo,
237 ) -> crate::Result<()>;
238
239 /// Clears the active memtable atomically.
240 fn clear_active_memtable(&self);
241
242 /// Returns the number of sealed memtables.
243 fn sealed_memtable_count(&self) -> usize;
244
245 /// Performs compaction on the tree's levels, blocking the caller until it's done.
246 ///
247 /// # Errors
248 ///
249 /// Will return `Err` if an IO error occurs.
250 fn compact(
251 &self,
252 strategy: Arc<dyn crate::compaction::CompactionStrategy>,
253 seqno_threshold: SeqNo,
254 ) -> crate::Result<()>;
255
256 /// Returns the next table's ID.
257 fn get_next_table_id(&self) -> TableId;
258
259 /// Returns the tree config.
260 fn tree_config(&self) -> &Config;
261
262 /// Returns the highest sequence number.
263 fn get_highest_seqno(&self) -> Option<SeqNo> {
264 let memtable_seqno = self.get_highest_memtable_seqno();
265 let table_seqno = self.get_highest_persisted_seqno();
266 memtable_seqno.max(table_seqno)
267 }
268
269 /// Returns the active memtable.
270 fn active_memtable(&self) -> Arc<Memtable>;
271
272 /// Returns the tree type.
273 fn tree_type(&self) -> crate::TreeType {
274 // NOTE: This is only really safe to do, because we validate
275 // that the config's kv_separation_opts is consistent with the tree type during recovery.
276 if self.tree_config().kv_separation_opts.is_some() {
277 crate::TreeType::Blob
278 } else {
279 crate::TreeType::Standard
280 }
281 }
282
283 /// Seals the active memtable.
284 fn rotate_memtable(&self) -> Option<Arc<Memtable>>;
285
286 /// Returns the number of tables currently in the tree.
287 fn table_count(&self) -> usize;
288
289 /// Returns the number of tables in `levels[idx]`.
290 ///
291 /// Returns `None` if the level does not exist (if idx >= 7).
292 fn level_table_count(&self, idx: usize) -> Option<usize>;
293
294 /// Returns the number of disjoint runs in L0.
295 ///
296 /// Can be used to determine whether to write stall.
297 fn l0_run_count(&self) -> usize;
298
299 /// Returns the number of blob files currently in the tree.
300 fn blob_file_count(&self) -> usize;
301
302 /// Approximates the number of items in the tree.
303 fn approximate_len(&self) -> usize;
304
305 /// Returns the disk space usage.
306 fn disk_space(&self) -> u64;
307
308 /// Returns the highest sequence number of the active memtable.
309 fn get_highest_memtable_seqno(&self) -> Option<SeqNo>;
310
311 /// Returns the highest sequence number that is flushed to disk.
312 fn get_highest_persisted_seqno(&self) -> Option<SeqNo>;
313
314 /// Scans the entire tree, returning the number of items.
315 ///
316 /// ###### Caution
317 ///
318 /// This operation scans the entire tree: O(n) complexity!
319 ///
320 /// Never, under any circumstances, use .`len()` == 0 to check
321 /// if the tree is empty, use [`Tree::is_empty`] instead.
322 ///
323 /// # Examples
324 ///
325 /// ```
326 /// # use lsm_tree::Error as TreeError;
327 /// use lsm_tree::{AbstractTree, Config, Tree};
328 ///
329 /// let folder = tempfile::tempdir()?;
330 /// let tree = Config::new(folder, Default::default(), Default::default()).open()?;
331 ///
332 /// assert_eq!(tree.len(0, None)?, 0);
333 /// tree.insert("1", "abc", 0);
334 /// tree.insert("3", "abc", 1);
335 /// tree.insert("5", "abc", 2);
336 /// assert_eq!(tree.len(3, None)?, 3);
337 /// #
338 /// # Ok::<(), TreeError>(())
339 /// ```
340 ///
341 /// # Errors
342 ///
343 /// Will return `Err` if an IO error occurs.
344 fn len(&self, seqno: SeqNo, index: Option<(Arc<Memtable>, SeqNo)>) -> crate::Result<usize> {
345 let mut count = 0;
346
347 for item in self.iter(seqno, index) {
348 let _ = item.key()?;
349 count += 1;
350 }
351
352 Ok(count)
353 }
354
355 /// Returns `true` if the tree is empty.
356 ///
357 /// This operation has O(log N) complexity.
358 ///
359 /// # Examples
360 ///
361 /// ```
362 /// # let folder = tempfile::tempdir()?;
363 /// use lsm_tree::{AbstractTree, Config, Tree};
364 ///
365 /// let tree = Config::new(folder, Default::default(), Default::default()).open()?;
366 /// assert!(tree.is_empty(0, None)?);
367 ///
368 /// tree.insert("a", "abc", 0);
369 /// assert!(!tree.is_empty(1, None)?);
370 /// #
371 /// # Ok::<(), lsm_tree::Error>(())
372 /// ```
373 ///
374 /// # Errors
375 ///
376 /// Will return `Err` if an IO error occurs.
377 fn is_empty(&self, seqno: SeqNo, index: Option<(Arc<Memtable>, SeqNo)>) -> crate::Result<bool> {
378 Ok(self
379 .first_key_value(seqno, index)
380 .map(crate::Guard::key)
381 .transpose()?
382 .is_none())
383 }
384
385 /// Returns the first key-value pair in the tree.
386 /// The key in this pair is the minimum key in the tree.
387 ///
388 /// # Examples
389 ///
390 /// ```
391 /// # use lsm_tree::Error as TreeError;
392 /// # use lsm_tree::{AbstractTree, Config, Tree, Guard};
393 /// #
394 /// # let folder = tempfile::tempdir()?;
395 /// let tree = Config::new(folder, Default::default(), Default::default()).open()?;
396 ///
397 /// tree.insert("1", "abc", 0);
398 /// tree.insert("3", "abc", 1);
399 /// tree.insert("5", "abc", 2);
400 ///
401 /// let key = tree.first_key_value(3, None).expect("item should exist").key()?;
402 /// assert_eq!(&*key, "1".as_bytes());
403 /// #
404 /// # Ok::<(), TreeError>(())
405 /// ```
406 ///
407 /// # Errors
408 ///
409 /// Will return `Err` if an IO error occurs.
410 fn first_key_value(
411 &self,
412 seqno: SeqNo,
413 index: Option<(Arc<Memtable>, SeqNo)>,
414 ) -> Option<IterGuardImpl> {
415 self.iter(seqno, index).next()
416 }
417
418 /// Returns the last key-value pair in the tree.
419 /// The key in this pair is the maximum key in the tree.
420 ///
421 /// # Examples
422 ///
423 /// ```
424 /// # use lsm_tree::Error as TreeError;
425 /// # use lsm_tree::{AbstractTree, Config, Tree, Guard};
426 /// #
427 /// # let folder = tempfile::tempdir()?;
428 /// # let tree = Config::new(folder, Default::default(), Default::default()).open()?;
429 /// #
430 /// tree.insert("1", "abc", 0);
431 /// tree.insert("3", "abc", 1);
432 /// tree.insert("5", "abc", 2);
433 ///
434 /// let key = tree.last_key_value(3, None).expect("item should exist").key()?;
435 /// assert_eq!(&*key, "5".as_bytes());
436 /// #
437 /// # Ok::<(), TreeError>(())
438 /// ```
439 ///
440 /// # Errors
441 ///
442 /// Will return `Err` if an IO error occurs.
443 fn last_key_value(
444 &self,
445 seqno: SeqNo,
446 index: Option<(Arc<Memtable>, SeqNo)>,
447 ) -> Option<IterGuardImpl> {
448 self.iter(seqno, index).next_back()
449 }
450
451 /// Returns the size of a value if it exists.
452 ///
453 /// # Examples
454 ///
455 /// ```
456 /// # let folder = tempfile::tempdir()?;
457 /// use lsm_tree::{AbstractTree, Config, Tree};
458 ///
459 /// let tree = Config::new(folder, Default::default(), Default::default()).open()?;
460 /// tree.insert("a", "my_value", 0);
461 ///
462 /// let size = tree.size_of("a", 1)?.unwrap_or_default();
463 /// assert_eq!("my_value".len() as u32, size);
464 ///
465 /// let size = tree.size_of("b", 1)?.unwrap_or_default();
466 /// assert_eq!(0, size);
467 /// #
468 /// # Ok::<(), lsm_tree::Error>(())
469 /// ```
470 ///
471 /// # Errors
472 ///
473 /// Will return `Err` if an IO error occurs.
474 fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>>;
475
476 /// Retrieves an item from the tree.
477 ///
478 /// # Examples
479 ///
480 /// ```
481 /// # let folder = tempfile::tempdir()?;
482 /// use lsm_tree::{AbstractTree, Config, Tree};
483 ///
484 /// let tree = Config::new(folder, Default::default(), Default::default()).open()?;
485 /// tree.insert("a", "my_value", 0);
486 ///
487 /// let item = tree.get("a", 1)?;
488 /// assert_eq!(Some("my_value".as_bytes().into()), item);
489 /// #
490 /// # Ok::<(), lsm_tree::Error>(())
491 /// ```
492 ///
493 /// # Errors
494 ///
495 /// Will return `Err` if an IO error occurs.
496 fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<UserValue>>;
497
498 /// Returns `true` if the tree contains the specified key.
499 ///
500 /// # Examples
501 ///
502 /// ```
503 /// # let folder = tempfile::tempdir()?;
504 /// # use lsm_tree::{AbstractTree, Config, Tree};
505 /// #
506 /// let tree = Config::new(folder, Default::default(), Default::default()).open()?;
507 /// assert!(!tree.contains_key("a", 0)?);
508 ///
509 /// tree.insert("a", "abc", 0);
510 /// assert!(tree.contains_key("a", 1)?);
511 /// #
512 /// # Ok::<(), lsm_tree::Error>(())
513 /// ```
514 ///
515 /// # Errors
516 ///
517 /// Will return `Err` if an IO error occurs.
518 fn contains_key<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<bool> {
519 self.get(key, seqno).map(|x| x.is_some())
520 }
521
522 /// Inserts a key-value pair into the tree.
523 ///
524 /// If the key already exists, the item will be overwritten.
525 ///
526 /// Returns the added item's size and new size of the memtable.
527 ///
528 /// # Examples
529 ///
530 /// ```
531 /// # let folder = tempfile::tempdir()?;
532 /// use lsm_tree::{AbstractTree, Config, Tree};
533 ///
534 /// let tree = Config::new(folder, Default::default(), Default::default()).open()?;
535 /// tree.insert("a", "abc", 0);
536 /// #
537 /// # Ok::<(), lsm_tree::Error>(())
538 /// ```
539 ///
540 /// # Errors
541 ///
542 /// Will return `Err` if an IO error occurs.
543 fn insert<K: Into<UserKey>, V: Into<UserValue>>(
544 &self,
545 key: K,
546 value: V,
547 seqno: SeqNo,
548 ) -> (u64, u64);
549
550 /// Removes an item from the tree.
551 ///
552 /// Returns the added item's size and new size of the memtable.
553 ///
554 /// # Examples
555 ///
556 /// ```
557 /// # let folder = tempfile::tempdir()?;
558 /// # use lsm_tree::{AbstractTree, Config, Tree};
559 /// #
560 /// # let tree = Config::new(folder, Default::default(), Default::default()).open()?;
561 /// tree.insert("a", "abc", 0);
562 ///
563 /// let item = tree.get("a", 1)?.expect("should have item");
564 /// assert_eq!("abc".as_bytes(), &*item);
565 ///
566 /// tree.remove("a", 1);
567 ///
568 /// let item = tree.get("a", 2)?;
569 /// assert_eq!(None, item);
570 /// #
571 /// # Ok::<(), lsm_tree::Error>(())
572 /// ```
573 ///
574 /// # Errors
575 ///
576 /// Will return `Err` if an IO error occurs.
577 fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64);
578
579 /// Removes an item from the tree.
580 ///
581 /// The tombstone marker of this delete operation will vanish when it
582 /// collides with its corresponding insertion.
583 /// This may cause older versions of the value to be resurrected, so it should
584 /// only be used and preferred in scenarios where a key is only ever written once.
585 ///
586 /// Returns the added item's size and new size of the memtable.
587 ///
588 /// # Examples
589 ///
590 /// ```
591 /// # let folder = tempfile::tempdir()?;
592 /// # use lsm_tree::{AbstractTree, Config, Tree};
593 /// #
594 /// # let tree = Config::new(folder, Default::default(), Default::default()).open()?;
595 /// tree.insert("a", "abc", 0);
596 ///
597 /// let item = tree.get("a", 1)?.expect("should have item");
598 /// assert_eq!("abc".as_bytes(), &*item);
599 ///
600 /// tree.remove_weak("a", 1);
601 ///
602 /// let item = tree.get("a", 2)?;
603 /// assert_eq!(None, item);
604 /// #
605 /// # Ok::<(), lsm_tree::Error>(())
606 /// ```
607 ///
608 /// # Errors
609 ///
610 /// Will return `Err` if an IO error occurs.
611 #[doc(hidden)]
612 fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64);
613}