fjall/partition/
options.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use crate::{compaction::Strategy as CompactionStrategy, file::MAGIC_BYTES};
6use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
7use lsm_tree::{CompressionType, TreeType};
8
9/// Configuration options for key-value-separated partitions.
10#[derive(Clone, Debug, PartialEq, Eq)]
11#[allow(clippy::module_name_repetitions)]
12pub struct KvSeparationOptions {
13    /// Compression to use for blobs.
14    pub(crate) compression: CompressionType,
15
16    /// Blob file (value log segment) target size in bytes
17    #[doc(hidden)]
18    pub file_target_size: u64,
19
20    /// Key-value separation threshold in bytes
21    #[doc(hidden)]
22    pub separation_threshold: u32,
23}
24
25impl KvSeparationOptions {
26    /// Sets the target size of blob files.
27    ///
28    /// Smaller blob files allow more granular garbage collection
29    /// which allows lower space amp for lower write I/O cost.
30    ///
31    /// Larger blob files decrease the number of files on disk and thus maintenance
32    /// overhead.
33    ///
34    /// Defaults to 128 MiB.
35    #[must_use]
36    pub fn file_target_size(mut self, bytes: u64) -> Self {
37        self.file_target_size = bytes;
38        self
39    }
40
41    /// Sets the key-value separation threshold in bytes.
42    ///
43    /// Smaller value will reduce compaction overhead and thus write amplification,
44    /// at the cost of lower read performance.
45    ///
46    /// Defaults to 1 KiB.
47    #[must_use]
48    pub fn separation_threshold(mut self, bytes: u32) -> Self {
49        self.separation_threshold = bytes;
50        self
51    }
52}
53
54impl Default for KvSeparationOptions {
55    fn default() -> Self {
56        Self {
57            #[cfg(feature = "lz4")]
58            compression: CompressionType::Lz4,
59
60            #[cfg(all(feature = "miniz", not(feature = "lz4")))]
61            compression: CompressionType::Miniz(6),
62
63            #[cfg(not(any(feature = "lz4", feature = "miniz")))]
64            compression: CompressionType::None,
65
66            file_target_size: /* 128 MiB */ 128 * 1_024 * 1_024,
67
68            separation_threshold: /* 1 KiB */ 1_024,
69        }
70    }
71}
72
73impl lsm_tree::coding::Encode for KvSeparationOptions {
74    fn encode_into<W: std::io::Write>(&self, writer: &mut W) -> Result<(), lsm_tree::EncodeError> {
75        self.compression.encode_into(writer)?;
76        writer.write_u64::<BigEndian>(self.file_target_size)?;
77        writer.write_u32::<BigEndian>(self.separation_threshold)?;
78        Ok(())
79    }
80}
81
82impl lsm_tree::coding::Decode for KvSeparationOptions {
83    fn decode_from<R: std::io::Read>(reader: &mut R) -> Result<Self, lsm_tree::DecodeError>
84    where
85        Self: Sized,
86    {
87        let compression = CompressionType::decode_from(reader)?;
88        let file_target_size = reader.read_u64::<BigEndian>()?;
89        let separation_threshold = reader.read_u32::<BigEndian>()?;
90
91        Ok(Self {
92            compression,
93            file_target_size,
94            separation_threshold,
95        })
96    }
97}
98
99/// Options to configure a partition
100#[allow(clippy::module_name_repetitions)]
101#[derive(Clone, Debug)]
102pub struct CreateOptions {
103    /// Maximum size of this partition's memtable - can be changed during runtime
104    pub(crate) max_memtable_size: u32,
105
106    /// Block size of data blocks.
107    #[doc(hidden)]
108    pub data_block_size: u32,
109
110    /// Block size of index blocks.
111    #[doc(hidden)]
112    pub index_block_size: u32,
113
114    /// Amount of levels of the LSM tree (depth of tree).
115    pub(crate) level_count: u8,
116
117    /// Bits per key for levels that are not L0, L1
118    pub(crate) bloom_bits_per_key: i8,
119
120    /// Tree type, see [`TreeType`].
121    pub(crate) tree_type: TreeType,
122
123    /// Compression to use.
124    pub(crate) compression: CompressionType,
125
126    pub(crate) manual_journal_persist: bool,
127
128    #[doc(hidden)]
129    pub compaction_strategy: CompactionStrategy,
130
131    pub(crate) kv_separation: Option<KvSeparationOptions>,
132}
133
134impl lsm_tree::coding::Encode for CreateOptions {
135    fn encode_into<W: std::io::Write>(&self, writer: &mut W) -> Result<(), lsm_tree::EncodeError> {
136        writer.write_all(MAGIC_BYTES)?;
137
138        writer.write_u8(self.level_count)?;
139        writer.write_u8(self.tree_type.into())?;
140
141        writer.write_u32::<BigEndian>(self.max_memtable_size)?;
142        writer.write_u32::<BigEndian>(self.data_block_size)?;
143        writer.write_u32::<BigEndian>(self.index_block_size)?;
144
145        self.compression.encode_into(writer)?;
146
147        writer.write_u8(u8::from(self.manual_journal_persist))?;
148
149        writer.write_i8(self.bloom_bits_per_key)?;
150
151        // TODO: move into compaction module
152        match &self.compaction_strategy {
153            CompactionStrategy::Leveled(s) => {
154                writer.write_u8(0)?;
155                writer.write_u8(s.l0_threshold)?;
156                writer.write_u8(s.level_ratio)?;
157                writer.write_u32::<BigEndian>(s.target_size)?;
158            }
159            CompactionStrategy::SizeTiered(s) => {
160                writer.write_u8(1)?;
161                writer.write_u8(s.level_ratio)?;
162                writer.write_u32::<BigEndian>(s.base_size)?;
163            }
164            CompactionStrategy::Fifo(s) => {
165                writer.write_u8(2)?;
166                writer.write_u64::<BigEndian>(s.limit)?;
167
168                match s.ttl_seconds {
169                    Some(s) => {
170                        writer.write_u8(1)?;
171                        writer.write_u64::<BigEndian>(s)
172                    }
173                    None => writer.write_u8(0),
174                }?;
175            }
176        }
177
178        match &self.kv_separation {
179            Some(opts) => {
180                writer.write_u8(1)?;
181                opts.encode_into(writer)?;
182            }
183            None => {
184                writer.write_u8(0)?;
185            }
186        }
187
188        Ok(())
189    }
190}
191
192impl lsm_tree::coding::Decode for CreateOptions {
193    fn decode_from<R: std::io::Read>(reader: &mut R) -> Result<Self, lsm_tree::DecodeError>
194    where
195        Self: Sized,
196    {
197        let mut header = [0; MAGIC_BYTES.len()];
198        reader.read_exact(&mut header)?;
199
200        if header != MAGIC_BYTES {
201            return Err(lsm_tree::DecodeError::InvalidHeader(
202                "PartitionCreateOptions",
203            ));
204        }
205
206        let level_count = reader.read_u8()?;
207
208        let tree_type = reader.read_u8()?;
209        let tree_type: TreeType = tree_type
210            .try_into()
211            .map_err(|()| lsm_tree::DecodeError::InvalidTag(("TreeType", tree_type)))?;
212
213        let max_memtable_size = reader.read_u32::<BigEndian>()?;
214        let data_block_size = reader.read_u32::<BigEndian>()?;
215        let index_block_size = reader.read_u32::<BigEndian>()?;
216
217        let compression = CompressionType::decode_from(reader)?;
218
219        let manual_journal_persist = reader.read_u8()? != 0;
220
221        let bloom_bits_per_key = reader.read_i8()?;
222
223        // TODO: move into compaction module
224        let compaction_tag = reader.read_u8()?;
225        let compaction_strategy = match compaction_tag {
226            0 => {
227                let l0_threshold = reader.read_u8()?;
228                let level_ratio = reader.read_u8()?;
229                let target_size = reader.read_u32::<BigEndian>()?;
230
231                CompactionStrategy::Leveled(crate::compaction::Leveled {
232                    l0_threshold,
233                    target_size,
234                    level_ratio,
235                })
236            }
237            1 => {
238                let level_ratio = reader.read_u8()?;
239                let base_size = reader.read_u32::<BigEndian>()?;
240
241                CompactionStrategy::SizeTiered(crate::compaction::SizeTiered {
242                    base_size,
243                    level_ratio,
244                })
245            }
246            2 => {
247                let limit = reader.read_u64::<BigEndian>()?;
248
249                let ttl_tag = reader.read_u8()?;
250                let ttl_seconds = match ttl_tag {
251                    0 => None,
252                    1 => Some(reader.read_u64::<BigEndian>()?),
253                    _ => return Err(lsm_tree::DecodeError::InvalidTag(("TtlSeconds", ttl_tag))),
254                };
255
256                CompactionStrategy::Fifo(crate::compaction::Fifo::new(limit, ttl_seconds))
257            }
258            _ => {
259                return Err(lsm_tree::DecodeError::InvalidTag((
260                    "CompactionStrategy",
261                    compaction_tag,
262                )));
263            }
264        };
265
266        let kv_sep_tag = reader.read_u8()?;
267        let kv_separation = match kv_sep_tag {
268            0 => None,
269            1 => Some(KvSeparationOptions::decode_from(reader)?),
270            _ => {
271                return Err(lsm_tree::DecodeError::InvalidTag((
272                    "KvSeparationOptions",
273                    kv_sep_tag,
274                )));
275            }
276        };
277
278        Ok(Self {
279            max_memtable_size,
280            data_block_size,
281            index_block_size,
282            level_count,
283            bloom_bits_per_key,
284            tree_type,
285            compression,
286            manual_journal_persist,
287            compaction_strategy,
288            kv_separation,
289        })
290    }
291}
292
293impl Default for CreateOptions {
294    fn default() -> Self {
295        let default_tree_config = lsm_tree::Config::default();
296
297        Self {
298            manual_journal_persist: false,
299
300            max_memtable_size: /* 16 MiB */ 16 * 1_024 * 1_024,
301
302            data_block_size: default_tree_config.data_block_size,
303            index_block_size: default_tree_config.index_block_size,
304            bloom_bits_per_key: default_tree_config.bloom_bits_per_key,
305            level_count: default_tree_config.level_count,
306
307            tree_type: TreeType::Standard,
308
309            #[cfg(feature = "lz4")]
310            compression: CompressionType::Lz4,
311
312            #[cfg(all(feature = "miniz", not(feature = "lz4")))]
313            compression: CompressionType::Miniz(6),
314
315            #[cfg(not(any(feature = "lz4", feature = "miniz")))]
316            compression: CompressionType::None,
317
318            kv_separation: None,
319
320            compaction_strategy: CompactionStrategy::default(),
321        }
322    }
323}
324
325impl CreateOptions {
326    /// Sets the bits per key for bloom filters.
327    ///
328    /// More bits per key increases memory usage, but decreases the
329    /// false positive rate of bloom filters, which decreases unnecessary
330    /// read I/O for point reads.
331    ///
332    /// Default = 10 bits
333    #[must_use]
334    #[doc(hidden)]
335    pub fn bloom_filter_bits(mut self, bits: Option<u8>) -> Self {
336        // NOTE: Can simply cast because of the assert above
337        #[allow(clippy::cast_possible_wrap)]
338        if let Some(bits) = bits {
339            assert!(bits <= 20, "bloom filter bits up to 20 are supported");
340            self.bloom_bits_per_key = bits as i8;
341        } else {
342            self.bloom_bits_per_key = -1;
343        }
344        self
345    }
346
347    /// Sets the compression method.
348    ///
349    /// Once set for a partition, this property is not considered in the future.
350    ///
351    /// Default = In order: Lz4 -> Miniz -> None, depending on compilation flags
352    #[must_use]
353    pub fn compression(mut self, compression: CompressionType) -> Self {
354        self.compression = compression;
355
356        if let Some(opts) = &mut self.kv_separation {
357            opts.compression = compression;
358        }
359
360        self
361    }
362
363    /// Sets the compaction strategy.
364    ///
365    /// Default = Leveled
366    #[must_use]
367    pub fn compaction_strategy(mut self, compaction_strategy: CompactionStrategy) -> Self {
368        self.compaction_strategy = compaction_strategy;
369        self
370    }
371
372    /// If `false`, writes will flush data to the operating system.
373    ///
374    /// Default = false
375    ///
376    /// Set to `true` to handle persistence manually, e.g. manually using `PersistMode::SyncData`.
377    #[must_use]
378    pub fn manual_journal_persist(mut self, flag: bool) -> Self {
379        self.manual_journal_persist = flag;
380        self
381    }
382
383    /// Sets the maximum memtable size.
384    ///
385    /// Default = 16 MiB
386    ///
387    /// Recommended size 8 - 64 MiB, depending on how much memory
388    /// is available.
389    ///
390    /// Note that the memory usage may temporarily be `max_memtable_size * flush_worker_count`
391    /// because of parallel flushing.
392    /// Use the keyspace's `max_write_buffer_size` to cap global memory usage.
393    ///
394    /// Conversely, if `max_memtable_size` is larger than 64 MiB,
395    /// it may require increasing the keyspace's `max_write_buffer_size`.
396    #[must_use]
397    pub fn max_memtable_size(mut self, bytes: u32) -> Self {
398        self.max_memtable_size = bytes;
399        self
400    }
401
402    /// Sets the block size.
403    ///
404    /// Once set for a partition, this property is not considered in the future.
405    ///
406    /// Default = 4 KiB
407    ///
408    /// For point read heavy workloads (get) a sensible default is
409    /// somewhere between 4 - 8 KiB, depending on the average value size.
410    ///
411    /// For scan heavy workloads (range, prefix), use 16 - 64 KiB
412    /// which also increases compression efficiency.
413    ///
414    /// # Panics
415    ///
416    /// Panics if the block size is smaller than 1 KiB or larger than 512 KiB.
417    #[must_use]
418    pub fn block_size(mut self, block_size: u32) -> Self {
419        assert!(block_size >= 1_024);
420        assert!(block_size <= 512 * 1_024);
421
422        self.data_block_size = block_size;
423        self.index_block_size = block_size;
424
425        self
426    }
427
428    /// Enables key-value separation for this partition.
429    ///
430    /// Key-value separation is intended for large value scenarios (1 KiB+ per KV).
431    /// Large values will be separated into a log-structured value log, which heavily
432    /// decreases compaction overhead at the cost of slightly higher read latency
433    /// and higher temporary space usage.
434    /// Also, garbage collection for deleted or outdated values becomes lazy, so
435    /// GC needs to be triggered *manually*.
436    /// See <https://fjall-rs.github.io/post/announcing-fjall-2/#key-value-separation> for more information.
437    ///
438    /// Once set for a partition, this property is not considered in the future.
439    ///
440    /// Default = disabled
441    #[must_use]
442    pub fn with_kv_separation(mut self, mut opts: KvSeparationOptions) -> Self {
443        self.tree_type = TreeType::Blob;
444
445        opts.compression = self.compression;
446        self.kv_separation = Some(opts);
447
448        self
449    }
450}
451
452#[cfg(test)]
453mod tests {
454    use super::*;
455    use test_log::test;
456
457    #[test]
458    #[cfg(not(any(feature = "lz4", feature = "miniz")))]
459    fn partition_opts_compression_none() {
460        let mut c = CreateOptions::default();
461        assert_eq!(c.compression, CompressionType::None);
462        assert_eq!(c.kv_separation, None);
463
464        c = c.with_kv_separation(KvSeparationOptions::default());
465        assert_eq!(
466            c.kv_separation.as_ref().unwrap().compression,
467            CompressionType::None,
468        );
469
470        c = c.compression(CompressionType::None);
471        assert_eq!(c.compression, CompressionType::None);
472        assert_eq!(c.kv_separation.unwrap().compression, CompressionType::None);
473    }
474
475    #[test]
476    #[allow(clippy::unwrap_used)]
477    #[cfg(feature = "lz4")]
478    fn partition_opts_compression_default() {
479        let mut c = CreateOptions::default();
480        assert_eq!(c.compression, CompressionType::Lz4);
481        assert_eq!(c.kv_separation, None);
482
483        c = c.with_kv_separation(KvSeparationOptions::default());
484        assert_eq!(
485            c.kv_separation.as_ref().unwrap().compression,
486            CompressionType::Lz4,
487        );
488
489        c = c.compression(CompressionType::None);
490        assert_eq!(c.compression, CompressionType::None);
491        assert_eq!(c.kv_separation.unwrap().compression, CompressionType::None);
492    }
493
494    #[test]
495    #[allow(clippy::unwrap_used)]
496    #[cfg(not(feature = "lz4"))]
497    #[cfg(feature = "miniz")]
498    fn partition_opts_compression_miniz() {
499        let mut c = CreateOptions::default();
500        assert_eq!(c.compression, CompressionType::Miniz(6));
501        assert_eq!(c.kv_separation, None);
502
503        c = c.with_kv_separation(KvSeparationOptions::default());
504        assert_eq!(
505            c.kv_separation.as_ref().unwrap().compression,
506            CompressionType::Miniz(6),
507        );
508
509        c = c.compression(CompressionType::None);
510        assert_eq!(c.compression, CompressionType::None);
511        assert_eq!(c.kv_separation.unwrap().compression, CompressionType::None);
512    }
513
514    #[test]
515    #[cfg(all(feature = "miniz", feature = "lz4"))]
516    fn partition_opts_compression_all() {
517        let mut c = CreateOptions::default();
518        assert_eq!(c.compression, CompressionType::Lz4);
519        assert_eq!(c.kv_separation, None);
520
521        c = c.with_kv_separation(KvSeparationOptions::default());
522        assert_eq!(
523            c.kv_separation.as_ref().unwrap().compression,
524            CompressionType::Lz4,
525        );
526
527        c = c.compression(CompressionType::None);
528        assert_eq!(c.compression, CompressionType::None);
529        assert_eq!(
530            c.kv_separation.as_ref().unwrap().compression,
531            CompressionType::None
532        );
533
534        c = c.compression(CompressionType::Miniz(3));
535        assert_eq!(c.compression, CompressionType::Miniz(3));
536        assert_eq!(
537            c.kv_separation.unwrap().compression,
538            CompressionType::Miniz(3)
539        );
540    }
541}