lsm_tree/segment/
mod.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5pub mod block;
6pub mod block_index;
7pub mod file_offsets;
8mod forward_reader;
9pub mod id;
10pub mod inner;
11pub mod meta;
12pub mod multi_writer;
13pub mod range;
14pub mod reader;
15pub mod scanner;
16pub mod trailer;
17pub mod value_block;
18pub mod value_block_consumer;
19pub mod writer;
20
21use crate::{
22    block_cache::BlockCache,
23    bloom::{BloomFilter, CompositeHash},
24    descriptor_table::FileDescriptorTable,
25    time::unix_timestamp,
26    tree::inner::TreeId,
27    value::{InternalValue, SeqNo, UserKey},
28};
29use block_index::BlockIndexImpl;
30use forward_reader::ForwardReader;
31use id::GlobalSegmentId;
32use inner::Inner;
33use meta::SegmentId;
34use range::Range;
35use scanner::Scanner;
36use std::{ops::Bound, path::Path, sync::Arc};
37
38#[allow(clippy::module_name_repetitions)]
39pub type SegmentInner = Inner;
40
41/// Disk segment (a.k.a. `SSTable`, `SST`, `sorted string table`) that is located on disk
42///
43/// A segment is an immutable list of key-value pairs, split into compressed blocks.
44/// A reference to the block (`block handle`) is saved in the "block index".
45///
46/// Deleted entries are represented by tombstones.
47///
48/// Segments can be merged together to improve read performance and reduce disk space by removing outdated item versions.
49#[doc(alias("sstable", "sst", "sorted string table"))]
50#[derive(Clone)]
51pub struct Segment(Arc<Inner>);
52
53impl From<Inner> for Segment {
54    fn from(value: Inner) -> Self {
55        Self(Arc::new(value))
56    }
57}
58
59impl std::ops::Deref for Segment {
60    type Target = Inner;
61
62    fn deref(&self) -> &Self::Target {
63        &self.0
64    }
65}
66
67impl std::fmt::Debug for Segment {
68    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69        write!(f, "Segment:{}({})", self.id(), self.metadata.key_range)
70    }
71}
72
73impl Segment {
74    // TODO: in Leveled compaction, compact segments that live very long and have
75    // many versions (possibly unnecessary space usage of old, stale versions)
76    /// Calculates how many versions per key there are on average.
77    #[must_use]
78    pub fn version_factor(&self) -> f32 {
79        self.metadata.item_count as f32 / self.metadata.key_count as f32
80    }
81
82    /// Gets the segment age in nanoseconds.
83    #[must_use]
84    pub fn age(&self) -> u128 {
85        let now = unix_timestamp().as_nanos();
86        let created_at = self.metadata.created_at * 1_000;
87        now.saturating_sub(created_at)
88    }
89
90    /// Gets the global segment ID.
91    #[must_use]
92    pub fn global_id(&self) -> GlobalSegmentId {
93        (self.tree_id, self.id()).into()
94    }
95
96    /// Gets the segment ID.
97    ///
98    /// The segment ID is unique for this tree, but not
99    /// across multiple trees, use [`Segment::global_id`] for that.
100    #[must_use]
101    pub fn id(&self) -> SegmentId {
102        self.metadata.id
103    }
104
105    pub(crate) fn verify(&self) -> crate::Result<usize> {
106        use block::checksum::Checksum;
107        use block_index::IndexBlock;
108        use value_block::ValueBlock;
109
110        let mut data_block_count = 0;
111        let mut broken_count = 0;
112
113        let guard = self
114            .descriptor_table
115            .access(&self.global_id())?
116            .expect("should have gotten file");
117
118        let mut file = guard.file.lock().expect("lock is poisoned");
119
120        // TODO: maybe move to BlockIndexImpl::verify
121        match &*self.block_index {
122            BlockIndexImpl::Full(block_index) => {
123                for handle in block_index.iter() {
124                    let value_block = match ValueBlock::from_file(&mut *file, handle.offset) {
125                        Ok(v) => v,
126                        Err(e) => {
127                            log::error!(
128                     "data block {handle:?} could not be loaded, it is probably corrupted: {e:?}"
129                 );
130                            broken_count += 1;
131                            data_block_count += 1;
132                            continue;
133                        }
134                    };
135
136                    let (_, data) = ValueBlock::to_bytes_compressed(
137                        &value_block.items,
138                        value_block.header.previous_block_offset,
139                        value_block.header.compression,
140                    )?;
141                    let actual_checksum = Checksum::from_bytes(&data);
142
143                    if value_block.header.checksum != actual_checksum {
144                        log::error!("{handle:?} is corrupted, invalid checksum value");
145                        broken_count += 1;
146                    }
147
148                    data_block_count += 1;
149
150                    if data_block_count % 1_000 == 0 {
151                        log::debug!("Checked {data_block_count} data blocks");
152                    }
153                }
154            }
155            BlockIndexImpl::TwoLevel(block_index) => {
156                // NOTE: TODO: because of 1.74.0
157                #[allow(clippy::explicit_iter_loop)]
158                for handle in block_index.top_level_index.iter() {
159                    let block = match IndexBlock::from_file(&mut *file, handle.offset) {
160                        Ok(v) => v,
161                        Err(e) => {
162                            log::error!(
163                 "index block {handle:?} could not be loaded, it is probably corrupted: {e:?}"
164             );
165                            broken_count += 1;
166                            continue;
167                        }
168                    };
169
170                    for handle in &*block.items {
171                        let value_block = match ValueBlock::from_file(&mut *file, handle.offset) {
172                            Ok(v) => v,
173                            Err(e) => {
174                                log::error!(
175                     "data block {handle:?} could not be loaded, it is probably corrupted: {e:?}"
176                 );
177                                broken_count += 1;
178                                data_block_count += 1;
179                                continue;
180                            }
181                        };
182
183                        let (_, data) = ValueBlock::to_bytes_compressed(
184                            &value_block.items,
185                            value_block.header.previous_block_offset,
186                            value_block.header.compression,
187                        )?;
188                        let actual_checksum = Checksum::from_bytes(&data);
189
190                        if value_block.header.checksum != actual_checksum {
191                            log::error!("{handle:?} is corrupted, invalid checksum value");
192                            broken_count += 1;
193                        }
194
195                        data_block_count += 1;
196
197                        if data_block_count % 1_000 == 0 {
198                            log::debug!("Checked {data_block_count} data blocks");
199                        }
200                    }
201                }
202            }
203        }
204
205        if data_block_count != self.metadata.data_block_count {
206            log::error!(
207                "Not all data blocks were visited during verification of disk segment {:?}",
208                self.id(),
209            );
210            broken_count += 1;
211        }
212
213        Ok(broken_count)
214    }
215
216    pub(crate) fn load_bloom<P: AsRef<Path>>(
217        path: P,
218        ptr: value_block::BlockOffset,
219    ) -> crate::Result<Option<BloomFilter>> {
220        Ok(if *ptr > 0 {
221            use crate::coding::Decode;
222            use std::{
223                fs::File,
224                io::{Seek, SeekFrom},
225            };
226
227            let mut reader = File::open(path)?;
228            reader.seek(SeekFrom::Start(*ptr))?;
229            Some(BloomFilter::decode_from(&mut reader)?)
230        } else {
231            None
232        })
233    }
234
235    /// Tries to recover a segment from a file.
236    pub(crate) fn recover<P: AsRef<Path>>(
237        file_path: P,
238        tree_id: TreeId,
239        block_cache: Arc<BlockCache>,
240        descriptor_table: Arc<FileDescriptorTable>,
241        use_full_block_index: bool,
242    ) -> crate::Result<Self> {
243        use block_index::{full_index::FullBlockIndex, two_level_index::TwoLevelBlockIndex};
244        use trailer::SegmentFileTrailer;
245
246        let file_path = file_path.as_ref();
247
248        log::debug!("Recovering segment from file {file_path:?}");
249        let trailer = SegmentFileTrailer::from_file(file_path)?;
250
251        assert_eq!(
252            0, *trailer.offsets.range_tombstones_ptr,
253            "Range tombstones not supported"
254        );
255
256        log::debug!(
257            "Creating block index, with tli_ptr={}",
258            trailer.offsets.tli_ptr
259        );
260
261        let block_index = if use_full_block_index {
262            let block_index =
263                FullBlockIndex::from_file(file_path, &trailer.metadata, &trailer.offsets)?;
264
265            BlockIndexImpl::Full(block_index)
266        } else {
267            let block_index = TwoLevelBlockIndex::from_file(
268                file_path,
269                &trailer.metadata,
270                trailer.offsets.tli_ptr,
271                (tree_id, trailer.metadata.id).into(),
272                descriptor_table.clone(),
273                block_cache.clone(),
274            )?;
275            BlockIndexImpl::TwoLevel(block_index)
276        };
277
278        let bloom_ptr = trailer.offsets.bloom_ptr;
279
280        Ok(Self(Arc::new(Inner {
281            tree_id,
282
283            descriptor_table,
284            metadata: trailer.metadata,
285            offsets: trailer.offsets,
286
287            block_index: Arc::new(block_index),
288            block_cache,
289
290            bloom_filter: Self::load_bloom(file_path, bloom_ptr)?,
291        })))
292    }
293
294    #[must_use]
295    /// Gets the bloom filter size
296    pub fn bloom_filter_size(&self) -> usize {
297        self.bloom_filter
298            .as_ref()
299            .map(super::bloom::BloomFilter::len)
300            .unwrap_or_default()
301    }
302
303    pub fn get<K: AsRef<[u8]>>(
304        &self,
305        key: K,
306        seqno: Option<SeqNo>,
307        hash: CompositeHash,
308    ) -> crate::Result<Option<InternalValue>> {
309        if let Some(seqno) = seqno {
310            if self.metadata.seqnos.0 >= seqno {
311                return Ok(None);
312            }
313        }
314
315        if !self.metadata.key_range.contains_key(&key) {
316            return Ok(None);
317        }
318
319        if let Some(bf) = &self.bloom_filter {
320            if !bf.contains_hash(hash) {
321                return Ok(None);
322            }
323        }
324
325        self.point_read(key, seqno)
326    }
327
328    fn point_read<K: AsRef<[u8]>>(
329        &self,
330        key: K,
331        seqno: Option<SeqNo>,
332    ) -> crate::Result<Option<InternalValue>> {
333        use block_index::BlockIndex;
334        use value_block::{CachePolicy, ValueBlock};
335        use value_block_consumer::ValueBlockConsumer;
336
337        let key = key.as_ref();
338
339        let Some(first_block_handle) = self
340            .block_index
341            .get_lowest_block_containing_key(key, CachePolicy::Write)?
342        else {
343            return Ok(None);
344        };
345
346        let Some(block) = ValueBlock::load_by_block_handle(
347            &self.descriptor_table,
348            &self.block_cache,
349            self.global_id(),
350            first_block_handle,
351            CachePolicy::Write,
352        )?
353        else {
354            return Ok(None);
355        };
356
357        if seqno.is_none() {
358            // NOTE: Fastpath for non-seqno reads (which are most common)
359            // This avoids setting up a rather expensive block iterator
360            // (see explanation for that below)
361            // This only really works because sequence numbers are sorted
362            // in descending order
363            return Ok(block.get_latest(key).cloned());
364        }
365
366        let mut reader = ForwardReader::new(
367            self.offsets.index_block_ptr,
368            &self.descriptor_table,
369            self.global_id(),
370            &self.block_cache,
371            first_block_handle,
372        );
373        reader.lo_block_size = block.header.data_length.into();
374        reader.lo_block_items = Some(ValueBlockConsumer::with_bounds(block, Some(key), None));
375        reader.lo_initialized = true;
376
377        // NOTE: For finding a specific seqno,
378        // we need to use a reader
379        // because nothing really prevents the version
380        // we are searching for to be in the next block
381        // after the one our key starts in, or the block after that
382        //
383        // Example (key:seqno), searching for a:2:
384        //
385        // [..., a:5, a:4] [a:3, a:2, b: 4, b:3]
386        // ^               ^
387        // Block A         Block B
388        //
389        // Based on get_lower_bound_block, "a" is in Block A
390        // However, we are searching for A with seqno 2, which
391        // unfortunately is in the next block
392        //
393        // Also because of weak tombstones, we may have to look further than the first item we encounter
394        let mut reader = reader.filter(|x| {
395            match x {
396                Ok(entry) => {
397                    // Check for seqno if needed
398                    if let Some(seqno) = seqno {
399                        entry.key.seqno < seqno
400                    } else {
401                        true
402                    }
403                }
404                Err(_) => true,
405            }
406        });
407
408        let Some(entry) = reader.next().transpose()? else {
409            return Ok(None);
410        };
411
412        // NOTE: We are past the searched key, so don't return anything
413        if &*entry.key.user_key > key {
414            return Ok(None);
415        }
416
417        Ok(Some(entry))
418    }
419
420    pub fn is_key_in_key_range<K: AsRef<[u8]>>(&self, key: K) -> bool {
421        self.metadata.key_range.contains_key(key)
422    }
423
424    // TODO: move segment tests into module, then make pub(crate)
425
426    /// Creates an iterator over the `Segment`.
427    ///
428    /// # Errors
429    ///
430    /// Will return `Err` if an IO error occurs.
431    #[must_use]
432    #[allow(clippy::iter_without_into_iter)]
433    #[doc(hidden)]
434    pub fn iter(&self) -> Range {
435        self.range((std::ops::Bound::Unbounded, std::ops::Bound::Unbounded))
436    }
437
438    #[doc(hidden)]
439    pub fn scan<P: AsRef<Path>>(&self, base_folder: P) -> crate::Result<Scanner> {
440        let segment_file_path = base_folder.as_ref().join(self.metadata.id.to_string());
441        let block_count = self.metadata.data_block_count.try_into().expect("oops");
442        Scanner::new(segment_file_path, block_count)
443    }
444
445    /// Creates a ranged iterator over the `Segment`.
446    ///
447    /// # Errors
448    ///
449    /// Will return `Err` if an IO error occurs.
450    #[must_use]
451    pub(crate) fn range(&self, range: (Bound<UserKey>, Bound<UserKey>)) -> Range {
452        Range::new(
453            self.offsets.index_block_ptr,
454            self.descriptor_table.clone(),
455            self.global_id(),
456            self.block_cache.clone(),
457            self.block_index.clone(),
458            range,
459        )
460    }
461
462    /// Returns the highest sequence number in the segment.
463    #[must_use]
464    pub fn get_highest_seqno(&self) -> SeqNo {
465        self.metadata.seqnos.1
466    }
467
468    /// Returns the amount of tombstone markers in the `Segment`.
469    #[must_use]
470    #[doc(hidden)]
471    pub fn tombstone_count(&self) -> u64 {
472        self.metadata.tombstone_count
473    }
474
475    /// Returns the ratio of tombstone markers in the `Segment`.
476    #[must_use]
477    #[doc(hidden)]
478    pub fn tombstone_ratio(&self) -> f32 {
479        self.metadata.tombstone_count as f32 / self.metadata.key_count as f32
480    }
481
482    /// Checks if a key range is (partially or fully) contained in this segment.
483    pub(crate) fn check_key_range_overlap(
484        &self,
485        bounds: &(Bound<UserKey>, Bound<UserKey>),
486    ) -> bool {
487        self.metadata.key_range.overlaps_with_bounds(bounds)
488    }
489}