lsm_tree/segment/
mod.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5pub mod block;
6pub mod block_index;
7pub mod file_offsets;
8mod forward_reader;
9pub mod id;
10pub mod inner;
11pub mod meta;
12pub mod multi_writer;
13pub mod range;
14pub mod reader;
15pub mod scanner;
16pub mod trailer;
17pub mod value_block;
18pub mod value_block_consumer;
19pub mod writer;
20
21use crate::{
22    bloom::{BloomFilter, CompositeHash},
23    cache::Cache,
24    descriptor_table::FileDescriptorTable,
25    time::unix_timestamp,
26    tree::inner::TreeId,
27    value::{InternalValue, SeqNo, UserKey},
28};
29use block_index::BlockIndexImpl;
30use forward_reader::ForwardReader;
31use id::GlobalSegmentId;
32use inner::Inner;
33use meta::SegmentId;
34use range::Range;
35use scanner::Scanner;
36use std::{
37    ops::Bound,
38    path::Path,
39    sync::{atomic::AtomicBool, Arc},
40};
41
42#[allow(clippy::module_name_repetitions)]
43pub type SegmentInner = Inner;
44
45/// Disk segment (a.k.a. `SSTable`, `SST`, `sorted string table`) that is located on disk
46///
47/// A segment is an immutable list of key-value pairs, split into compressed blocks.
48/// A reference to the block (`block handle`) is saved in the "block index".
49///
50/// Deleted entries are represented by tombstones.
51///
52/// Segments can be merged together to improve read performance and reduce disk space by removing outdated item versions.
53#[doc(alias("sstable", "sst", "sorted string table"))]
54#[derive(Clone)]
55pub struct Segment(Arc<Inner>);
56
57impl From<Inner> for Segment {
58    fn from(value: Inner) -> Self {
59        Self(Arc::new(value))
60    }
61}
62
63impl std::ops::Deref for Segment {
64    type Target = Inner;
65
66    fn deref(&self) -> &Self::Target {
67        &self.0
68    }
69}
70
71impl std::fmt::Debug for Segment {
72    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73        write!(f, "Segment:{}({})", self.id(), self.metadata.key_range)
74    }
75}
76
77impl Segment {
78    // TODO: in Leveled compaction, compact segments that live very long and have
79    // many versions (possibly unnecessary space usage of old, stale versions)
80    /// Calculates how many versions per key there are on average.
81    #[must_use]
82    pub fn version_factor(&self) -> f32 {
83        self.metadata.item_count as f32 / self.metadata.key_count as f32
84    }
85
86    /// Gets the segment age in nanoseconds.
87    #[must_use]
88    pub fn age(&self) -> u128 {
89        let now = unix_timestamp().as_nanos();
90        let created_at = self.metadata.created_at * 1_000;
91        now.saturating_sub(created_at)
92    }
93
94    /// Gets the global segment ID.
95    #[must_use]
96    pub fn global_id(&self) -> GlobalSegmentId {
97        (self.tree_id, self.id()).into()
98    }
99
100    /// Gets the segment ID.
101    ///
102    /// The segment ID is unique for this tree, but not
103    /// across multiple trees, use [`Segment::global_id`] for that.
104    #[must_use]
105    pub fn id(&self) -> SegmentId {
106        self.metadata.id
107    }
108
109    pub(crate) fn verify(&self) -> crate::Result<usize> {
110        use block::checksum::Checksum;
111        use block_index::IndexBlock;
112        use value_block::ValueBlock;
113
114        let mut data_block_count = 0;
115        let mut broken_count = 0;
116
117        let guard = self
118            .descriptor_table
119            .access(&self.global_id())?
120            .expect("should have gotten file");
121
122        let mut file = guard.file.lock().expect("lock is poisoned");
123
124        // TODO: maybe move to BlockIndexImpl::verify
125        match &*self.block_index {
126            BlockIndexImpl::Full(block_index) => {
127                for handle in block_index.iter() {
128                    let value_block = match ValueBlock::from_file(&mut *file, handle.offset) {
129                        Ok(v) => v,
130                        Err(e) => {
131                            log::error!(
132                     "data block {handle:?} could not be loaded, it is probably corrupted: {e:?}"
133                 );
134                            broken_count += 1;
135                            data_block_count += 1;
136                            continue;
137                        }
138                    };
139
140                    let (_, data) = ValueBlock::to_bytes_compressed(
141                        &value_block.items,
142                        value_block.header.previous_block_offset,
143                        value_block.header.compression,
144                    )?;
145                    let actual_checksum = Checksum::from_bytes(&data);
146
147                    if value_block.header.checksum != actual_checksum {
148                        log::error!("{handle:?} is corrupted, invalid checksum value");
149                        broken_count += 1;
150                    }
151
152                    data_block_count += 1;
153
154                    if data_block_count % 1_000 == 0 {
155                        log::debug!("Checked {data_block_count} data blocks");
156                    }
157                }
158            }
159            BlockIndexImpl::TwoLevel(block_index) => {
160                // NOTE: TODO: because of 1.74.0
161                #[allow(clippy::explicit_iter_loop)]
162                for handle in block_index.top_level_index.iter() {
163                    let block = match IndexBlock::from_file(&mut *file, handle.offset) {
164                        Ok(v) => v,
165                        Err(e) => {
166                            log::error!(
167                 "index block {handle:?} could not be loaded, it is probably corrupted: {e:?}"
168             );
169                            broken_count += 1;
170                            continue;
171                        }
172                    };
173
174                    for handle in &*block.items {
175                        let value_block = match ValueBlock::from_file(&mut *file, handle.offset) {
176                            Ok(v) => v,
177                            Err(e) => {
178                                log::error!(
179                     "data block {handle:?} could not be loaded, it is probably corrupted: {e:?}"
180                 );
181                                broken_count += 1;
182                                data_block_count += 1;
183                                continue;
184                            }
185                        };
186
187                        let (_, data) = ValueBlock::to_bytes_compressed(
188                            &value_block.items,
189                            value_block.header.previous_block_offset,
190                            value_block.header.compression,
191                        )?;
192                        let actual_checksum = Checksum::from_bytes(&data);
193
194                        if value_block.header.checksum != actual_checksum {
195                            log::error!("{handle:?} is corrupted, invalid checksum value");
196                            broken_count += 1;
197                        }
198
199                        data_block_count += 1;
200
201                        if data_block_count % 1_000 == 0 {
202                            log::debug!("Checked {data_block_count} data blocks");
203                        }
204                    }
205                }
206            }
207        }
208
209        if data_block_count != self.metadata.data_block_count {
210            log::error!(
211                "Not all data blocks were visited during verification of disk segment {:?}",
212                self.id(),
213            );
214            broken_count += 1;
215        }
216
217        Ok(broken_count)
218    }
219
220    pub(crate) fn load_bloom(
221        path: &Path,
222        ptr: block::offset::BlockOffset,
223    ) -> crate::Result<Option<BloomFilter>> {
224        Ok(if *ptr > 0 {
225            use crate::coding::Decode;
226            use std::{
227                fs::File,
228                io::{Seek, SeekFrom},
229            };
230
231            let mut reader = File::open(path)?;
232            reader.seek(SeekFrom::Start(*ptr))?;
233            Some(BloomFilter::decode_from(&mut reader)?)
234        } else {
235            None
236        })
237    }
238
239    /// Tries to recover a segment from a file.
240    pub(crate) fn recover(
241        file_path: &Path,
242        tree_id: TreeId,
243        cache: Arc<Cache>,
244        descriptor_table: Arc<FileDescriptorTable>,
245        use_full_block_index: bool,
246    ) -> crate::Result<Self> {
247        use block_index::{full_index::FullBlockIndex, two_level_index::TwoLevelBlockIndex};
248        use trailer::SegmentFileTrailer;
249
250        log::debug!("Recovering segment from file {file_path:?}");
251        let trailer = SegmentFileTrailer::from_file(file_path)?;
252
253        assert_eq!(
254            0, *trailer.offsets.range_tombstones_ptr,
255            "Range tombstones not supported"
256        );
257
258        log::debug!(
259            "Creating block index, with tli_ptr={}",
260            trailer.offsets.tli_ptr
261        );
262
263        let block_index = if use_full_block_index {
264            let block_index =
265                FullBlockIndex::from_file(file_path, &trailer.metadata, &trailer.offsets)?;
266
267            BlockIndexImpl::Full(block_index)
268        } else {
269            let block_index = TwoLevelBlockIndex::from_file(
270                file_path,
271                &trailer.metadata,
272                trailer.offsets.tli_ptr,
273                (tree_id, trailer.metadata.id).into(),
274                descriptor_table.clone(),
275                cache.clone(),
276            )?;
277            BlockIndexImpl::TwoLevel(block_index)
278        };
279
280        let bloom_ptr = trailer.offsets.bloom_ptr;
281
282        Ok(Self(Arc::new(Inner {
283            path: file_path.into(),
284
285            tree_id,
286
287            descriptor_table,
288            metadata: trailer.metadata,
289            offsets: trailer.offsets,
290
291            block_index: Arc::new(block_index),
292            cache,
293
294            bloom_filter: Self::load_bloom(file_path, bloom_ptr)?,
295
296            is_deleted: AtomicBool::default(),
297        })))
298    }
299
300    pub(crate) fn mark_as_deleted(&self) {
301        self.0
302            .is_deleted
303            .store(true, std::sync::atomic::Ordering::Release);
304    }
305
306    #[must_use]
307    /// Gets the bloom filter size
308    pub fn bloom_filter_size(&self) -> usize {
309        self.bloom_filter
310            .as_ref()
311            .map(super::bloom::BloomFilter::len)
312            .unwrap_or_default()
313    }
314
315    pub fn get(
316        &self,
317        key: &[u8],
318        seqno: Option<SeqNo>,
319        hash: CompositeHash,
320    ) -> crate::Result<Option<InternalValue>> {
321        if let Some(seqno) = seqno {
322            if self.metadata.seqnos.0 >= seqno {
323                return Ok(None);
324            }
325        }
326
327        if let Some(bf) = &self.bloom_filter {
328            if !bf.contains_hash(hash) {
329                return Ok(None);
330            }
331        }
332
333        self.point_read(key, seqno)
334    }
335
336    fn point_read(&self, key: &[u8], seqno: Option<SeqNo>) -> crate::Result<Option<InternalValue>> {
337        use block_index::BlockIndex;
338        use value_block::{CachePolicy, ValueBlock};
339        use value_block_consumer::ValueBlockConsumer;
340
341        let Some(first_block_handle) = self
342            .block_index
343            .get_lowest_block_containing_key(key, CachePolicy::Write)?
344        else {
345            return Ok(None);
346        };
347
348        let Some(block) = ValueBlock::load_by_block_handle(
349            &self.descriptor_table,
350            &self.cache,
351            self.global_id(),
352            first_block_handle,
353            CachePolicy::Write,
354        )?
355        else {
356            return Ok(None);
357        };
358
359        if seqno.is_none() {
360            // NOTE: Fastpath for non-seqno reads (which are most common)
361            // This avoids setting up a rather expensive block iterator
362            // (see explanation for that below)
363            // This only really works because sequence numbers are sorted
364            // in descending order
365            return Ok(block.get_latest(key).cloned());
366        }
367
368        let mut reader = ForwardReader::new(
369            self.offsets.index_block_ptr,
370            &self.descriptor_table,
371            self.global_id(),
372            &self.cache,
373            first_block_handle,
374        );
375        reader.lo_block_size = block.header.data_length.into();
376        reader.lo_block_items = Some(ValueBlockConsumer::with_bounds(block, Some(key), None));
377        reader.lo_initialized = true;
378
379        // NOTE: For finding a specific seqno,
380        // we need to use a reader
381        // because nothing really prevents the version
382        // we are searching for to be in the next block
383        // after the one our key starts in, or the block after that
384        //
385        // Example (key:seqno), searching for a:2:
386        //
387        // [..., a:5, a:4] [a:3, a:2, b: 4, b:3]
388        // ^               ^
389        // Block A         Block B
390        //
391        // Based on get_lower_bound_block, "a" is in Block A
392        // However, we are searching for A with seqno 2, which
393        // unfortunately is in the next block
394        //
395        // Also because of weak tombstones, we may have to look further than the first item we encounter
396        let mut reader = reader.filter(|x| {
397            match x {
398                Ok(entry) => {
399                    // Check for seqno if needed
400                    if let Some(seqno) = seqno {
401                        entry.key.seqno < seqno
402                    } else {
403                        true
404                    }
405                }
406                Err(_) => true,
407            }
408        });
409
410        let Some(entry) = reader.next().transpose()? else {
411            return Ok(None);
412        };
413
414        // NOTE: We are past the searched key, so don't return anything
415        if &*entry.key.user_key > key {
416            return Ok(None);
417        }
418
419        Ok(Some(entry))
420    }
421
422    #[must_use]
423    pub fn is_key_in_key_range(&self, key: &[u8]) -> bool {
424        self.metadata.key_range.contains_key(key)
425    }
426
427    /// Creates an iterator over the `Segment`.
428    ///
429    /// # Errors
430    ///
431    /// Will return `Err` if an IO error occurs.
432    #[must_use]
433    #[allow(clippy::iter_without_into_iter)]
434    #[doc(hidden)]
435    pub fn iter(&self) -> Range {
436        self.range((std::ops::Bound::Unbounded, std::ops::Bound::Unbounded))
437    }
438
439    #[doc(hidden)]
440    pub fn scan<P: AsRef<Path>>(&self, base_folder: P) -> crate::Result<Scanner> {
441        let segment_file_path = base_folder.as_ref().join(self.metadata.id.to_string());
442        let block_count = self.metadata.data_block_count.try_into().expect("oops");
443        Scanner::new(&segment_file_path, block_count)
444    }
445
446    /// Creates a ranged iterator over the `Segment`.
447    ///
448    /// # Errors
449    ///
450    /// Will return `Err` if an IO error occurs.
451    #[must_use]
452    pub(crate) fn range(&self, range: (Bound<UserKey>, Bound<UserKey>)) -> Range {
453        Range::new(
454            self.offsets.index_block_ptr,
455            self.descriptor_table.clone(),
456            self.global_id(),
457            self.cache.clone(),
458            self.block_index.clone(),
459            range,
460        )
461    }
462
463    /// Returns the highest sequence number in the segment.
464    #[must_use]
465    pub fn get_highest_seqno(&self) -> SeqNo {
466        self.metadata.seqnos.1
467    }
468
469    /// Returns the amount of tombstone markers in the `Segment`.
470    #[must_use]
471    #[doc(hidden)]
472    pub fn tombstone_count(&self) -> u64 {
473        self.metadata.tombstone_count
474    }
475
476    /// Returns the ratio of tombstone markers in the `Segment`.
477    #[must_use]
478    #[doc(hidden)]
479    pub fn tombstone_ratio(&self) -> f32 {
480        self.metadata.tombstone_count as f32 / self.metadata.key_count as f32
481    }
482
483    /// Checks if a key range is (partially or fully) contained in this segment.
484    pub(crate) fn check_key_range_overlap(
485        &self,
486        bounds: &(Bound<UserKey>, Bound<UserKey>),
487    ) -> bool {
488        self.metadata.key_range.overlaps_with_bounds(bounds)
489    }
490}