1use crate::{compaction::Strategy as CompactionStrategy, file::MAGIC_BYTES};
6use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
7use lsm_tree::{CompressionType, TreeType};
8
9#[derive(Clone, Debug, PartialEq, Eq)]
11#[allow(clippy::module_name_repetitions)]
12pub struct KvSeparationOptions {
13 pub(crate) compression: CompressionType,
15
16 #[doc(hidden)]
18 pub file_target_size: u64,
19
20 #[doc(hidden)]
22 pub separation_threshold: u32,
23}
24
25impl KvSeparationOptions {
26 #[must_use]
36 pub fn file_target_size(mut self, bytes: u64) -> Self {
37 self.file_target_size = bytes;
38 self
39 }
40
41 #[must_use]
48 pub fn separation_threshold(mut self, bytes: u32) -> Self {
49 self.separation_threshold = bytes;
50 self
51 }
52}
53
54impl Default for KvSeparationOptions {
55 fn default() -> Self {
56 Self {
57 #[cfg(feature = "lz4")]
58 compression: CompressionType::Lz4,
59
60 #[cfg(all(feature = "miniz", not(feature = "lz4")))]
61 compression: CompressionType::Miniz(6),
62
63 #[cfg(not(any(feature = "lz4", feature = "miniz")))]
64 compression: CompressionType::None,
65
66 file_target_size: 128 * 1_024 * 1_024,
67
68 separation_threshold: 1_024,
69 }
70 }
71}
72
73impl lsm_tree::coding::Encode for KvSeparationOptions {
74 fn encode_into<W: std::io::Write>(&self, writer: &mut W) -> Result<(), lsm_tree::EncodeError> {
75 self.compression.encode_into(writer)?;
76 writer.write_u64::<BigEndian>(self.file_target_size)?;
77 writer.write_u32::<BigEndian>(self.separation_threshold)?;
78 Ok(())
79 }
80}
81
82impl lsm_tree::coding::Decode for KvSeparationOptions {
83 fn decode_from<R: std::io::Read>(reader: &mut R) -> Result<Self, lsm_tree::DecodeError>
84 where
85 Self: Sized,
86 {
87 let compression = CompressionType::decode_from(reader)?;
88 let file_target_size = reader.read_u64::<BigEndian>()?;
89 let separation_threshold = reader.read_u32::<BigEndian>()?;
90
91 Ok(Self {
92 compression,
93 file_target_size,
94 separation_threshold,
95 })
96 }
97}
98
99#[allow(clippy::module_name_repetitions)]
101#[derive(Clone, Debug)]
102pub struct CreateOptions {
103 pub(crate) max_memtable_size: u32,
105
106 #[doc(hidden)]
108 pub data_block_size: u32,
109
110 #[doc(hidden)]
112 pub index_block_size: u32,
113
114 pub(crate) level_count: u8,
116
117 pub(crate) bloom_bits_per_key: i8,
119
120 pub(crate) tree_type: TreeType,
122
123 pub(crate) compression: CompressionType,
125
126 pub(crate) manual_journal_persist: bool,
127
128 #[doc(hidden)]
129 pub compaction_strategy: CompactionStrategy,
130
131 pub(crate) kv_separation: Option<KvSeparationOptions>,
132}
133
134impl lsm_tree::coding::Encode for CreateOptions {
135 fn encode_into<W: std::io::Write>(&self, writer: &mut W) -> Result<(), lsm_tree::EncodeError> {
136 writer.write_all(MAGIC_BYTES)?;
137
138 writer.write_u8(self.level_count)?;
139 writer.write_u8(self.tree_type.into())?;
140
141 writer.write_u32::<BigEndian>(self.max_memtable_size)?;
142 writer.write_u32::<BigEndian>(self.data_block_size)?;
143 writer.write_u32::<BigEndian>(self.index_block_size)?;
144
145 self.compression.encode_into(writer)?;
146
147 writer.write_u8(u8::from(self.manual_journal_persist))?;
148
149 writer.write_i8(self.bloom_bits_per_key)?;
150
151 match &self.compaction_strategy {
153 CompactionStrategy::Leveled(s) => {
154 writer.write_u8(0)?;
155 writer.write_u8(s.l0_threshold)?;
156 writer.write_u8(s.level_ratio)?;
157 writer.write_u32::<BigEndian>(s.target_size)?;
158 }
159 CompactionStrategy::SizeTiered(s) => {
160 writer.write_u8(1)?;
161 writer.write_u8(s.level_ratio)?;
162 writer.write_u32::<BigEndian>(s.base_size)?;
163 }
164 CompactionStrategy::Fifo(s) => {
165 writer.write_u8(2)?;
166 writer.write_u64::<BigEndian>(s.limit)?;
167
168 match s.ttl_seconds {
169 Some(s) => {
170 writer.write_u8(1)?;
171 writer.write_u64::<BigEndian>(s)
172 }
173 None => writer.write_u8(0),
174 }?;
175 }
176 }
177
178 match &self.kv_separation {
179 Some(opts) => {
180 writer.write_u8(1)?;
181 opts.encode_into(writer)?;
182 }
183 None => {
184 writer.write_u8(0)?;
185 }
186 }
187
188 Ok(())
189 }
190}
191
192impl lsm_tree::coding::Decode for CreateOptions {
193 fn decode_from<R: std::io::Read>(reader: &mut R) -> Result<Self, lsm_tree::DecodeError>
194 where
195 Self: Sized,
196 {
197 let mut header = [0; MAGIC_BYTES.len()];
198 reader.read_exact(&mut header)?;
199
200 if header != MAGIC_BYTES {
201 return Err(lsm_tree::DecodeError::InvalidHeader(
202 "PartitionCreateOptions",
203 ));
204 }
205
206 let level_count = reader.read_u8()?;
207
208 let tree_type = reader.read_u8()?;
209 let tree_type: TreeType = tree_type
210 .try_into()
211 .map_err(|()| lsm_tree::DecodeError::InvalidTag(("TreeType", tree_type)))?;
212
213 let max_memtable_size = reader.read_u32::<BigEndian>()?;
214 let data_block_size = reader.read_u32::<BigEndian>()?;
215 let index_block_size = reader.read_u32::<BigEndian>()?;
216
217 let compression = CompressionType::decode_from(reader)?;
218
219 let manual_journal_persist = reader.read_u8()? != 0;
220
221 let bloom_bits_per_key = reader.read_i8()?;
222
223 let compaction_tag = reader.read_u8()?;
225 let compaction_strategy = match compaction_tag {
226 0 => {
227 let l0_threshold = reader.read_u8()?;
228 let level_ratio = reader.read_u8()?;
229 let target_size = reader.read_u32::<BigEndian>()?;
230
231 CompactionStrategy::Leveled(crate::compaction::Leveled {
232 l0_threshold,
233 target_size,
234 level_ratio,
235 })
236 }
237 1 => {
238 let level_ratio = reader.read_u8()?;
239 let base_size = reader.read_u32::<BigEndian>()?;
240
241 CompactionStrategy::SizeTiered(crate::compaction::SizeTiered {
242 base_size,
243 level_ratio,
244 })
245 }
246 2 => {
247 let limit = reader.read_u64::<BigEndian>()?;
248
249 let ttl_tag = reader.read_u8()?;
250 let ttl_seconds = match ttl_tag {
251 0 => None,
252 1 => Some(reader.read_u64::<BigEndian>()?),
253 _ => return Err(lsm_tree::DecodeError::InvalidTag(("TtlSeconds", ttl_tag))),
254 };
255
256 CompactionStrategy::Fifo(crate::compaction::Fifo::new(limit, ttl_seconds))
257 }
258 _ => {
259 return Err(lsm_tree::DecodeError::InvalidTag((
260 "CompactionStrategy",
261 compaction_tag,
262 )));
263 }
264 };
265
266 let kv_sep_tag = reader.read_u8()?;
267 let kv_separation = match kv_sep_tag {
268 0 => None,
269 1 => Some(KvSeparationOptions::decode_from(reader)?),
270 _ => {
271 return Err(lsm_tree::DecodeError::InvalidTag((
272 "KvSeparationOptions",
273 kv_sep_tag,
274 )));
275 }
276 };
277
278 Ok(Self {
279 max_memtable_size,
280 data_block_size,
281 index_block_size,
282 level_count,
283 bloom_bits_per_key,
284 tree_type,
285 compression,
286 manual_journal_persist,
287 compaction_strategy,
288 kv_separation,
289 })
290 }
291}
292
293impl Default for CreateOptions {
294 fn default() -> Self {
295 let default_tree_config = lsm_tree::Config::default();
296
297 Self {
298 manual_journal_persist: false,
299
300 max_memtable_size: 16 * 1_024 * 1_024,
301
302 data_block_size: default_tree_config.data_block_size,
303 index_block_size: default_tree_config.index_block_size,
304 bloom_bits_per_key: default_tree_config.bloom_bits_per_key,
305 level_count: default_tree_config.level_count,
306
307 tree_type: TreeType::Standard,
308
309 #[cfg(feature = "lz4")]
310 compression: CompressionType::Lz4,
311
312 #[cfg(all(feature = "miniz", not(feature = "lz4")))]
313 compression: CompressionType::Miniz(6),
314
315 #[cfg(not(any(feature = "lz4", feature = "miniz")))]
316 compression: CompressionType::None,
317
318 kv_separation: None,
319
320 compaction_strategy: CompactionStrategy::default(),
321 }
322 }
323}
324
325impl CreateOptions {
326 #[must_use]
334 #[doc(hidden)]
335 pub fn bloom_filter_bits(mut self, bits: Option<u8>) -> Self {
336 #[allow(clippy::cast_possible_wrap)]
338 if let Some(bits) = bits {
339 assert!(bits <= 20, "bloom filter bits up to 20 are supported");
340 self.bloom_bits_per_key = bits as i8;
341 } else {
342 self.bloom_bits_per_key = -1;
343 }
344 self
345 }
346
347 #[must_use]
353 pub fn compression(mut self, compression: CompressionType) -> Self {
354 self.compression = compression;
355
356 if let Some(opts) = &mut self.kv_separation {
357 opts.compression = compression;
358 }
359
360 self
361 }
362
363 #[must_use]
367 pub fn compaction_strategy(mut self, compaction_strategy: CompactionStrategy) -> Self {
368 self.compaction_strategy = compaction_strategy;
369 self
370 }
371
372 #[must_use]
378 pub fn manual_journal_persist(mut self, flag: bool) -> Self {
379 self.manual_journal_persist = flag;
380 self
381 }
382
383 #[must_use]
397 pub fn max_memtable_size(mut self, bytes: u32) -> Self {
398 self.max_memtable_size = bytes;
399 self
400 }
401
402 #[must_use]
418 pub fn block_size(mut self, block_size: u32) -> Self {
419 assert!(block_size >= 1_024);
420 assert!(block_size <= 512 * 1_024);
421
422 self.data_block_size = block_size;
423 self.index_block_size = block_size;
424
425 self
426 }
427
428 #[must_use]
442 pub fn with_kv_separation(mut self, mut opts: KvSeparationOptions) -> Self {
443 self.tree_type = TreeType::Blob;
444
445 opts.compression = self.compression;
446 self.kv_separation = Some(opts);
447
448 self
449 }
450}
451
452#[cfg(test)]
453mod tests {
454 use super::*;
455 use test_log::test;
456
457 #[test]
458 #[cfg(not(any(feature = "lz4", feature = "miniz")))]
459 fn partition_opts_compression_none() {
460 let mut c = CreateOptions::default();
461 assert_eq!(c.compression, CompressionType::None);
462 assert_eq!(c.kv_separation, None);
463
464 c = c.with_kv_separation(KvSeparationOptions::default());
465 assert_eq!(
466 c.kv_separation.as_ref().unwrap().compression,
467 CompressionType::None,
468 );
469
470 c = c.compression(CompressionType::None);
471 assert_eq!(c.compression, CompressionType::None);
472 assert_eq!(c.kv_separation.unwrap().compression, CompressionType::None);
473 }
474
475 #[test]
476 #[allow(clippy::unwrap_used)]
477 #[cfg(feature = "lz4")]
478 fn partition_opts_compression_default() {
479 let mut c = CreateOptions::default();
480 assert_eq!(c.compression, CompressionType::Lz4);
481 assert_eq!(c.kv_separation, None);
482
483 c = c.with_kv_separation(KvSeparationOptions::default());
484 assert_eq!(
485 c.kv_separation.as_ref().unwrap().compression,
486 CompressionType::Lz4,
487 );
488
489 c = c.compression(CompressionType::None);
490 assert_eq!(c.compression, CompressionType::None);
491 assert_eq!(c.kv_separation.unwrap().compression, CompressionType::None);
492 }
493
494 #[test]
495 #[allow(clippy::unwrap_used)]
496 #[cfg(not(feature = "lz4"))]
497 #[cfg(feature = "miniz")]
498 fn partition_opts_compression_miniz() {
499 let mut c = CreateOptions::default();
500 assert_eq!(c.compression, CompressionType::Miniz(6));
501 assert_eq!(c.kv_separation, None);
502
503 c = c.with_kv_separation(KvSeparationOptions::default());
504 assert_eq!(
505 c.kv_separation.as_ref().unwrap().compression,
506 CompressionType::Miniz(6),
507 );
508
509 c = c.compression(CompressionType::None);
510 assert_eq!(c.compression, CompressionType::None);
511 assert_eq!(c.kv_separation.unwrap().compression, CompressionType::None);
512 }
513
514 #[test]
515 #[cfg(all(feature = "miniz", feature = "lz4"))]
516 fn partition_opts_compression_all() {
517 let mut c = CreateOptions::default();
518 assert_eq!(c.compression, CompressionType::Lz4);
519 assert_eq!(c.kv_separation, None);
520
521 c = c.with_kv_separation(KvSeparationOptions::default());
522 assert_eq!(
523 c.kv_separation.as_ref().unwrap().compression,
524 CompressionType::Lz4,
525 );
526
527 c = c.compression(CompressionType::None);
528 assert_eq!(c.compression, CompressionType::None);
529 assert_eq!(
530 c.kv_separation.as_ref().unwrap().compression,
531 CompressionType::None
532 );
533
534 c = c.compression(CompressionType::Miniz(3));
535 assert_eq!(c.compression, CompressionType::Miniz(3));
536 assert_eq!(
537 c.kv_separation.unwrap().compression,
538 CompressionType::Miniz(3)
539 );
540 }
541}