lsm_tree/segment/writer/
mod.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5mod meta;
6
7use super::{
8    block::{header::Header as BlockHeader, offset::BlockOffset},
9    block_index::writer::Writer as IndexWriter,
10    file_offsets::FileOffsets,
11    meta::{CompressionType, Metadata},
12    trailer::SegmentFileTrailer,
13    value_block::ValueBlock,
14};
15use crate::{
16    bloom::BloomFilter,
17    coding::Encode,
18    file::fsync_directory,
19    segment::block::ItemSize,
20    value::{InternalValue, UserKey},
21    SegmentId,
22};
23use std::{
24    fs::File,
25    io::{BufWriter, Seek, Write},
26    path::PathBuf,
27};
28
29/// Serializes and compresses values into blocks and writes them to disk as segment
30pub struct Writer {
31    pub(crate) opts: Options,
32
33    /// Compression to use
34    compression: CompressionType,
35
36    /// Segment file
37    segment_file_path: PathBuf,
38
39    /// Writer of data blocks
40    block_writer: BufWriter<File>,
41
42    /// Writer of index blocks
43    index_writer: IndexWriter,
44
45    /// Buffer of KVs
46    chunk: Vec<InternalValue>,
47    chunk_size: usize,
48
49    pub(crate) meta: meta::Metadata,
50
51    /// Stores the previous block position (used for creating back links)
52    prev_pos: (BlockOffset, BlockOffset),
53
54    current_key: Option<UserKey>,
55
56    bloom_policy: BloomConstructionPolicy,
57
58    /// Hashes for bloom filter
59    ///
60    /// using enhanced double hashing, so we got two u64s
61    bloom_hash_buffer: Vec<(u64, u64)>,
62}
63
64#[derive(Copy, Clone, Debug)]
65pub enum BloomConstructionPolicy {
66    BitsPerKey(u8),
67    FpRate(f32),
68}
69
70impl Default for BloomConstructionPolicy {
71    fn default() -> Self {
72        Self::BitsPerKey(10)
73    }
74}
75
76impl BloomConstructionPolicy {
77    #[must_use]
78    pub fn build(&self, n: usize) -> BloomFilter {
79        match self {
80            Self::BitsPerKey(bpk) => BloomFilter::with_bpk(n, *bpk),
81            Self::FpRate(fpr) => BloomFilter::with_fp_rate(n, *fpr),
82        }
83    }
84
85    #[must_use]
86    pub fn is_active(&self) -> bool {
87        match self {
88            Self::BitsPerKey(bpk) => *bpk > 0,
89            Self::FpRate(_) => true,
90        }
91    }
92}
93
94pub struct Options {
95    pub folder: PathBuf,
96    pub data_block_size: u32,
97    pub index_block_size: u32,
98    pub segment_id: SegmentId,
99}
100
101impl Writer {
102    /// Sets up a new `Writer` at the given folder
103    pub fn new(opts: Options) -> crate::Result<Self> {
104        let segment_file_path = opts.folder.join(opts.segment_id.to_string());
105
106        let block_writer = File::create(&segment_file_path)?;
107        let block_writer = BufWriter::with_capacity(u16::MAX.into(), block_writer);
108
109        let index_writer = IndexWriter::new(opts.index_block_size)?;
110
111        let chunk = Vec::new();
112
113        Ok(Self {
114            opts,
115            meta: meta::Metadata::default(),
116
117            compression: CompressionType::None,
118
119            segment_file_path,
120
121            block_writer,
122            index_writer,
123            chunk,
124
125            prev_pos: (BlockOffset(0), BlockOffset(0)),
126
127            chunk_size: 0,
128
129            current_key: None,
130
131            bloom_policy: BloomConstructionPolicy::default(),
132
133            bloom_hash_buffer: Vec::new(),
134        })
135    }
136
137    #[must_use]
138    pub(crate) fn use_compression(mut self, compression: CompressionType) -> Self {
139        self.compression = compression;
140        self.index_writer = self.index_writer.use_compression(compression);
141        self
142    }
143
144    #[must_use]
145    pub(crate) fn use_bloom_policy(mut self, bloom_policy: BloomConstructionPolicy) -> Self {
146        self.bloom_policy = bloom_policy;
147        self
148    }
149
150    /// Writes a compressed block to disk.
151    ///
152    /// This is triggered when a `Writer::write` causes the buffer to grow to the configured `block_size`.
153    ///
154    /// Should only be called when the block has items in it.
155    pub(crate) fn spill_block(&mut self) -> crate::Result<()> {
156        let Some(last) = self.chunk.last() else {
157            return Ok(());
158        };
159
160        let (header, data) =
161            ValueBlock::to_bytes_compressed(&self.chunk, self.prev_pos.0, self.compression)?;
162
163        self.meta.uncompressed_size += u64::from(header.uncompressed_length);
164
165        header.encode_into(&mut self.block_writer)?;
166
167        // Write to file
168        self.block_writer.write_all(&data)?;
169
170        let bytes_written = (BlockHeader::serialized_len() + data.len()) as u64;
171
172        self.index_writer
173            .register_block(last.key.user_key.clone(), self.meta.file_pos)?;
174
175        // Adjust metadata
176        self.meta.file_pos += bytes_written;
177        self.meta.item_count += self.chunk.len();
178        self.meta.data_block_count += 1;
179
180        // Back link stuff
181        self.prev_pos.0 = self.prev_pos.1;
182        self.prev_pos.1 += bytes_written;
183
184        // Set last key
185        self.meta.last_key = Some(
186            // NOTE: Expect is fine, because the chunk is not empty
187            //
188            // Also, we are allowed to remove the last item
189            // to get ownership of it, because the chunk is cleared after
190            // this anyway
191            #[allow(clippy::expect_used)]
192            self.chunk
193                .pop()
194                .expect("chunk should not be empty")
195                .key
196                .user_key,
197        );
198
199        // IMPORTANT: Clear chunk after everything else
200        self.chunk.clear();
201        self.chunk_size = 0;
202
203        Ok(())
204    }
205
206    /// Writes an item.
207    ///
208    /// # Note
209    ///
210    /// It's important that the incoming stream of items is correctly
211    /// sorted as described by the [`UserKey`], otherwise the block layout will
212    /// be non-sense.
213    pub fn write(&mut self, item: InternalValue) -> crate::Result<()> {
214        if item.is_tombstone() {
215            self.meta.tombstone_count += 1;
216        }
217
218        // NOTE: Check if we visit a new key
219        if Some(&item.key.user_key) != self.current_key.as_ref() {
220            self.meta.key_count += 1;
221            self.current_key = Some(item.key.user_key.clone());
222
223            // IMPORTANT: Do not buffer *every* item's key
224            // because there may be multiple versions
225            // of the same key
226            if self.bloom_policy.is_active() {
227                self.bloom_hash_buffer
228                    .push(BloomFilter::get_hash(&item.key.user_key));
229            }
230        }
231
232        let seqno = item.key.seqno;
233
234        if self.meta.first_key.is_none() {
235            self.meta.first_key = Some(item.key.user_key.clone());
236        }
237
238        self.chunk_size += item.size();
239        self.chunk.push(item);
240
241        if self.chunk_size >= self.opts.data_block_size as usize {
242            self.spill_block()?;
243        }
244
245        self.meta.lowest_seqno = self.meta.lowest_seqno.min(seqno);
246        self.meta.highest_seqno = self.meta.highest_seqno.max(seqno);
247
248        Ok(())
249    }
250
251    // TODO: should take mut self to avoid double finish
252
253    /// Finishes the segment, making sure all data is written durably
254    pub fn finish(&mut self) -> crate::Result<Option<SegmentFileTrailer>> {
255        self.spill_block()?;
256
257        // No items written! Just delete segment file and return nothing
258        if self.meta.item_count == 0 {
259            std::fs::remove_file(&self.segment_file_path)?;
260            return Ok(None);
261        }
262
263        let index_block_ptr = BlockOffset(self.block_writer.stream_position()?);
264        log::trace!("index_block_ptr={index_block_ptr}");
265
266        // Append index blocks to file
267        let tli_ptr = self.index_writer.finish(&mut self.block_writer)?;
268        log::trace!("tli_ptr={tli_ptr}");
269
270        self.meta.index_block_count = self.index_writer.block_count;
271
272        // Write bloom filter
273        let bloom_ptr = {
274            if self.bloom_hash_buffer.is_empty() {
275                BlockOffset(0)
276            } else {
277                let bloom_ptr = self.block_writer.stream_position()?;
278                let n = self.bloom_hash_buffer.len();
279
280                log::trace!(
281                    "Constructing Bloom filter with {n} entries: {:?}",
282                    self.bloom_policy,
283                );
284
285                let start = std::time::Instant::now();
286
287                let mut filter = self.bloom_policy.build(n);
288
289                for hash in std::mem::take(&mut self.bloom_hash_buffer) {
290                    filter.set_with_hash(hash);
291                }
292
293                log::trace!("Built Bloom filter in {:?}", start.elapsed());
294
295                filter.encode_into(&mut self.block_writer)?;
296
297                BlockOffset(bloom_ptr)
298            }
299        };
300        log::trace!("bloom_ptr={bloom_ptr}");
301
302        // TODO: #46 https://github.com/fjall-rs/lsm-tree/issues/46 - Write range filter
303        let rf_ptr = BlockOffset(0);
304        log::trace!("rf_ptr={rf_ptr}");
305
306        // TODO: #2 https://github.com/fjall-rs/lsm-tree/issues/2 - Write range tombstones
307        let range_tombstones_ptr = BlockOffset(0);
308        log::trace!("range_tombstones_ptr={range_tombstones_ptr}");
309
310        // TODO:
311        let pfx_ptr = BlockOffset(0);
312        log::trace!("pfx_ptr={pfx_ptr}");
313
314        // Write metadata
315        let metadata_ptr = BlockOffset(self.block_writer.stream_position()?);
316
317        let metadata = Metadata::from_writer(self.opts.segment_id, self)?;
318        metadata.encode_into(&mut self.block_writer)?;
319
320        // Bundle all the file offsets
321        let offsets = FileOffsets {
322            index_block_ptr,
323            tli_ptr,
324            bloom_ptr,
325            range_filter_ptr: rf_ptr,
326            range_tombstones_ptr,
327            pfx_ptr,
328            metadata_ptr,
329        };
330
331        // Write trailer
332        let trailer = SegmentFileTrailer { metadata, offsets };
333        trailer.encode_into(&mut self.block_writer)?;
334
335        // Finally, flush & fsync the blocks file
336        self.block_writer.flush()?;
337        self.block_writer.get_mut().sync_all()?;
338
339        // IMPORTANT: fsync folder on Unix
340        fsync_directory(&self.opts.folder)?;
341
342        log::debug!(
343            "Written {} items in {} blocks into new segment file, written {} MiB",
344            self.meta.item_count,
345            self.meta.data_block_count,
346            *self.meta.file_pos / 1_024 / 1_024
347        );
348
349        Ok(Some(trailer))
350    }
351}
352
353#[cfg(test)]
354#[allow(clippy::expect_used)]
355mod tests {
356    use super::*;
357    use crate::cache::Cache;
358    use crate::descriptor_table::FileDescriptorTable;
359    use crate::segment::block_index::top_level::TopLevelIndex;
360    use crate::segment::reader::Reader;
361    use crate::value::{InternalValue, ValueType};
362    use std::sync::Arc;
363    use test_log::test;
364
365    #[test]
366    fn segment_writer_seqnos() -> crate::Result<()> {
367        let folder = tempfile::tempdir()?.into_path();
368
369        let segment_id = 532;
370
371        let mut writer = Writer::new(Options {
372            folder,
373            data_block_size: 4_096,
374            index_block_size: 4_096,
375            segment_id,
376        })?;
377
378        writer.write(InternalValue::from_components(
379            "a",
380            nanoid::nanoid!().as_bytes(),
381            7,
382            ValueType::Value,
383        ))?;
384        writer.write(InternalValue::from_components(
385            "b",
386            nanoid::nanoid!().as_bytes(),
387            5,
388            ValueType::Value,
389        ))?;
390        writer.write(InternalValue::from_components(
391            "c",
392            nanoid::nanoid!().as_bytes(),
393            8,
394            ValueType::Value,
395        ))?;
396        writer.write(InternalValue::from_components(
397            "d",
398            nanoid::nanoid!().as_bytes(),
399            10,
400            ValueType::Value,
401        ))?;
402
403        let trailer = writer.finish()?.expect("should exist");
404
405        assert_eq!(5, trailer.metadata.seqnos.0);
406        assert_eq!(10, trailer.metadata.seqnos.1);
407
408        Ok(())
409    }
410
411    #[test]
412    fn segment_writer_zero_bpk() -> crate::Result<()> {
413        const ITEM_COUNT: u64 = 100;
414
415        let folder = tempfile::tempdir()?.into_path();
416
417        let segment_id = 532;
418
419        let mut writer = Writer::new(Options {
420            folder,
421            data_block_size: 4_096,
422            index_block_size: 4_096,
423            segment_id,
424        })?
425        .use_bloom_policy(BloomConstructionPolicy::BitsPerKey(0));
426
427        let items = (0u64..ITEM_COUNT).map(|i| {
428            InternalValue::from_components(
429                i.to_be_bytes(),
430                nanoid::nanoid!().as_bytes(),
431                0,
432                ValueType::Value,
433            )
434        });
435
436        for item in items {
437            writer.write(item)?;
438        }
439
440        let trailer = writer.finish()?.expect("should exist");
441
442        assert_eq!(ITEM_COUNT, trailer.metadata.item_count);
443        assert_eq!(ITEM_COUNT, trailer.metadata.key_count);
444        assert_eq!(trailer.offsets.bloom_ptr, BlockOffset(0));
445
446        Ok(())
447    }
448
449    #[test]
450    fn segment_writer_write_read() -> crate::Result<()> {
451        const ITEM_COUNT: u64 = 100;
452
453        let folder = tempfile::tempdir()?.into_path();
454
455        let segment_id = 532;
456
457        let mut writer = Writer::new(Options {
458            folder: folder.clone(),
459            data_block_size: 4_096,
460            index_block_size: 4_096,
461            segment_id,
462        })?;
463
464        let items = (0u64..ITEM_COUNT).map(|i| {
465            InternalValue::from_components(
466                i.to_be_bytes(),
467                nanoid::nanoid!().as_bytes(),
468                0,
469                ValueType::Value,
470            )
471        });
472
473        for item in items {
474            writer.write(item)?;
475        }
476
477        let trailer = writer.finish()?.expect("should exist");
478
479        assert_eq!(ITEM_COUNT, trailer.metadata.item_count);
480        assert_eq!(ITEM_COUNT, trailer.metadata.key_count);
481
482        assert!(*trailer.offsets.bloom_ptr > 0);
483
484        let segment_file_path = folder.join(segment_id.to_string());
485
486        // NOTE: The TLI is bound by the index block count, because we know the index block count is u32
487        // the TLI length fits into u32 as well
488        #[allow(clippy::cast_possible_truncation)]
489        {
490            let tli = TopLevelIndex::from_file(
491                &segment_file_path,
492                &trailer.metadata,
493                trailer.offsets.tli_ptr,
494            )?;
495
496            assert_eq!(tli.len() as u32, trailer.metadata.index_block_count);
497        }
498
499        let table = Arc::new(FileDescriptorTable::new(512, 1));
500        table.insert(segment_file_path, (0, segment_id).into());
501
502        let block_cache = Arc::new(Cache::with_capacity_bytes(10 * 1_024 * 1_024));
503
504        let iter = Reader::new(
505            trailer.offsets.index_block_ptr,
506            table,
507            (0, segment_id).into(),
508            block_cache,
509            BlockOffset(0),
510            None,
511        );
512
513        assert_eq!(ITEM_COUNT, iter.count() as u64);
514
515        Ok(())
516    }
517
518    #[test]
519    fn segment_writer_write_read_mvcc() -> crate::Result<()> {
520        const ITEM_COUNT: u64 = 1_000;
521        const VERSION_COUNT: u64 = 5;
522
523        let folder = tempfile::tempdir()?.into_path();
524
525        let segment_id = 532;
526
527        let mut writer = Writer::new(Options {
528            folder: folder.clone(),
529            data_block_size: 4_096,
530            index_block_size: 4_096,
531            segment_id,
532        })?;
533
534        for key in 0u64..ITEM_COUNT {
535            for seqno in (0..VERSION_COUNT).rev() {
536                let value = InternalValue::from_components(
537                    key.to_be_bytes(),
538                    nanoid::nanoid!().as_bytes(),
539                    seqno,
540                    ValueType::Value,
541                );
542
543                writer.write(value)?;
544            }
545        }
546
547        let trailer = writer.finish()?.expect("should exist");
548
549        assert_eq!(ITEM_COUNT * VERSION_COUNT, trailer.metadata.item_count);
550        assert_eq!(ITEM_COUNT, trailer.metadata.key_count);
551
552        assert!(*trailer.offsets.bloom_ptr > 0);
553
554        let segment_file_path = folder.join(segment_id.to_string());
555
556        let table = Arc::new(FileDescriptorTable::new(512, 1));
557        table.insert(segment_file_path, (0, segment_id).into());
558
559        let block_cache = Arc::new(Cache::with_capacity_bytes(10 * 1_024 * 1_024));
560
561        let iter = Reader::new(
562            trailer.offsets.index_block_ptr,
563            table,
564            (0, segment_id).into(),
565            block_cache,
566            BlockOffset(0),
567            None,
568        );
569
570        assert_eq!(ITEM_COUNT * VERSION_COUNT, iter.count() as u64);
571
572        Ok(())
573    }
574}