1mod meta;
6
7use super::{
8 block::{header::Header as BlockHeader, offset::BlockOffset},
9 block_index::writer::Writer as IndexWriter,
10 file_offsets::FileOffsets,
11 meta::{CompressionType, Metadata},
12 trailer::SegmentFileTrailer,
13 value_block::ValueBlock,
14};
15use crate::{
16 bloom::BloomFilter,
17 coding::Encode,
18 file::fsync_directory,
19 segment::block::ItemSize,
20 value::{InternalValue, UserKey},
21 SegmentId,
22};
23use std::{
24 fs::File,
25 io::{BufWriter, Seek, Write},
26 path::PathBuf,
27};
28
29pub struct Writer {
31 pub(crate) opts: Options,
32
33 compression: CompressionType,
35
36 segment_file_path: PathBuf,
38
39 block_writer: BufWriter<File>,
41
42 index_writer: IndexWriter,
44
45 chunk: Vec<InternalValue>,
47 chunk_size: usize,
48
49 pub(crate) meta: meta::Metadata,
50
51 prev_pos: (BlockOffset, BlockOffset),
53
54 current_key: Option<UserKey>,
55
56 bloom_policy: BloomConstructionPolicy,
57
58 bloom_hash_buffer: Vec<(u64, u64)>,
62}
63
64#[derive(Copy, Clone, Debug)]
65pub enum BloomConstructionPolicy {
66 BitsPerKey(u8),
67 FpRate(f32),
68}
69
70impl Default for BloomConstructionPolicy {
71 fn default() -> Self {
72 Self::BitsPerKey(10)
73 }
74}
75
76impl BloomConstructionPolicy {
77 #[must_use]
78 pub fn build(&self, n: usize) -> BloomFilter {
79 match self {
80 Self::BitsPerKey(bpk) => BloomFilter::with_bpk(n, *bpk),
81 Self::FpRate(fpr) => BloomFilter::with_fp_rate(n, *fpr),
82 }
83 }
84
85 #[must_use]
86 pub fn is_active(&self) -> bool {
87 match self {
88 Self::BitsPerKey(bpk) => *bpk > 0,
89 Self::FpRate(_) => true,
90 }
91 }
92}
93
94pub struct Options {
95 pub folder: PathBuf,
96 pub data_block_size: u32,
97 pub index_block_size: u32,
98 pub segment_id: SegmentId,
99}
100
101impl Writer {
102 pub fn new(opts: Options) -> crate::Result<Self> {
104 let segment_file_path = opts.folder.join(opts.segment_id.to_string());
105
106 let block_writer = File::create(&segment_file_path)?;
107 let block_writer = BufWriter::with_capacity(u16::MAX.into(), block_writer);
108
109 let index_writer = IndexWriter::new(opts.index_block_size)?;
110
111 let chunk = Vec::new();
112
113 Ok(Self {
114 opts,
115 meta: meta::Metadata::default(),
116
117 compression: CompressionType::None,
118
119 segment_file_path,
120
121 block_writer,
122 index_writer,
123 chunk,
124
125 prev_pos: (BlockOffset(0), BlockOffset(0)),
126
127 chunk_size: 0,
128
129 current_key: None,
130
131 bloom_policy: BloomConstructionPolicy::default(),
132
133 bloom_hash_buffer: Vec::new(),
134 })
135 }
136
137 #[must_use]
138 pub(crate) fn use_compression(mut self, compression: CompressionType) -> Self {
139 self.compression = compression;
140 self.index_writer = self.index_writer.use_compression(compression);
141 self
142 }
143
144 #[must_use]
145 pub(crate) fn use_bloom_policy(mut self, bloom_policy: BloomConstructionPolicy) -> Self {
146 self.bloom_policy = bloom_policy;
147 self
148 }
149
150 pub(crate) fn spill_block(&mut self) -> crate::Result<()> {
156 let Some(last) = self.chunk.last() else {
157 return Ok(());
158 };
159
160 let (header, data) =
161 ValueBlock::to_bytes_compressed(&self.chunk, self.prev_pos.0, self.compression)?;
162
163 self.meta.uncompressed_size += u64::from(header.uncompressed_length);
164
165 header.encode_into(&mut self.block_writer)?;
166
167 self.block_writer.write_all(&data)?;
169
170 let bytes_written = (BlockHeader::serialized_len() + data.len()) as u64;
171
172 self.index_writer
173 .register_block(last.key.user_key.clone(), self.meta.file_pos)?;
174
175 self.meta.file_pos += bytes_written;
177 self.meta.item_count += self.chunk.len();
178 self.meta.data_block_count += 1;
179
180 self.prev_pos.0 = self.prev_pos.1;
182 self.prev_pos.1 += bytes_written;
183
184 self.meta.last_key = Some(
186 #[allow(clippy::expect_used)]
192 self.chunk
193 .pop()
194 .expect("chunk should not be empty")
195 .key
196 .user_key,
197 );
198
199 self.chunk.clear();
201 self.chunk_size = 0;
202
203 Ok(())
204 }
205
206 pub fn write(&mut self, item: InternalValue) -> crate::Result<()> {
214 if item.is_tombstone() {
215 self.meta.tombstone_count += 1;
216 }
217
218 if Some(&item.key.user_key) != self.current_key.as_ref() {
220 self.meta.key_count += 1;
221 self.current_key = Some(item.key.user_key.clone());
222
223 if self.bloom_policy.is_active() {
227 self.bloom_hash_buffer
228 .push(BloomFilter::get_hash(&item.key.user_key));
229 }
230 }
231
232 let seqno = item.key.seqno;
233
234 if self.meta.first_key.is_none() {
235 self.meta.first_key = Some(item.key.user_key.clone());
236 }
237
238 self.chunk_size += item.size();
239 self.chunk.push(item);
240
241 if self.chunk_size >= self.opts.data_block_size as usize {
242 self.spill_block()?;
243 }
244
245 self.meta.lowest_seqno = self.meta.lowest_seqno.min(seqno);
246 self.meta.highest_seqno = self.meta.highest_seqno.max(seqno);
247
248 Ok(())
249 }
250
251 pub fn finish(&mut self) -> crate::Result<Option<SegmentFileTrailer>> {
255 self.spill_block()?;
256
257 if self.meta.item_count == 0 {
259 std::fs::remove_file(&self.segment_file_path)?;
260 return Ok(None);
261 }
262
263 let index_block_ptr = BlockOffset(self.block_writer.stream_position()?);
264 log::trace!("index_block_ptr={index_block_ptr}");
265
266 let tli_ptr = self.index_writer.finish(&mut self.block_writer)?;
268 log::trace!("tli_ptr={tli_ptr}");
269
270 self.meta.index_block_count = self.index_writer.block_count;
271
272 let bloom_ptr = {
274 if self.bloom_hash_buffer.is_empty() {
275 BlockOffset(0)
276 } else {
277 let bloom_ptr = self.block_writer.stream_position()?;
278 let n = self.bloom_hash_buffer.len();
279
280 log::trace!(
281 "Constructing Bloom filter with {n} entries: {:?}",
282 self.bloom_policy,
283 );
284
285 let start = std::time::Instant::now();
286
287 let mut filter = self.bloom_policy.build(n);
288
289 for hash in std::mem::take(&mut self.bloom_hash_buffer) {
290 filter.set_with_hash(hash);
291 }
292
293 log::trace!("Built Bloom filter in {:?}", start.elapsed());
294
295 filter.encode_into(&mut self.block_writer)?;
296
297 BlockOffset(bloom_ptr)
298 }
299 };
300 log::trace!("bloom_ptr={bloom_ptr}");
301
302 let rf_ptr = BlockOffset(0);
304 log::trace!("rf_ptr={rf_ptr}");
305
306 let range_tombstones_ptr = BlockOffset(0);
308 log::trace!("range_tombstones_ptr={range_tombstones_ptr}");
309
310 let pfx_ptr = BlockOffset(0);
312 log::trace!("pfx_ptr={pfx_ptr}");
313
314 let metadata_ptr = BlockOffset(self.block_writer.stream_position()?);
316
317 let metadata = Metadata::from_writer(self.opts.segment_id, self)?;
318 metadata.encode_into(&mut self.block_writer)?;
319
320 let offsets = FileOffsets {
322 index_block_ptr,
323 tli_ptr,
324 bloom_ptr,
325 range_filter_ptr: rf_ptr,
326 range_tombstones_ptr,
327 pfx_ptr,
328 metadata_ptr,
329 };
330
331 let trailer = SegmentFileTrailer { metadata, offsets };
333 trailer.encode_into(&mut self.block_writer)?;
334
335 self.block_writer.flush()?;
337 self.block_writer.get_mut().sync_all()?;
338
339 fsync_directory(&self.opts.folder)?;
341
342 log::debug!(
343 "Written {} items in {} blocks into new segment file, written {} MiB",
344 self.meta.item_count,
345 self.meta.data_block_count,
346 *self.meta.file_pos / 1_024 / 1_024
347 );
348
349 Ok(Some(trailer))
350 }
351}
352
353#[cfg(test)]
354#[allow(clippy::expect_used)]
355mod tests {
356 use super::*;
357 use crate::cache::Cache;
358 use crate::descriptor_table::FileDescriptorTable;
359 use crate::segment::block_index::top_level::TopLevelIndex;
360 use crate::segment::reader::Reader;
361 use crate::value::{InternalValue, ValueType};
362 use std::sync::Arc;
363 use test_log::test;
364
365 #[test]
366 fn segment_writer_seqnos() -> crate::Result<()> {
367 let folder = tempfile::tempdir()?.into_path();
368
369 let segment_id = 532;
370
371 let mut writer = Writer::new(Options {
372 folder,
373 data_block_size: 4_096,
374 index_block_size: 4_096,
375 segment_id,
376 })?;
377
378 writer.write(InternalValue::from_components(
379 "a",
380 nanoid::nanoid!().as_bytes(),
381 7,
382 ValueType::Value,
383 ))?;
384 writer.write(InternalValue::from_components(
385 "b",
386 nanoid::nanoid!().as_bytes(),
387 5,
388 ValueType::Value,
389 ))?;
390 writer.write(InternalValue::from_components(
391 "c",
392 nanoid::nanoid!().as_bytes(),
393 8,
394 ValueType::Value,
395 ))?;
396 writer.write(InternalValue::from_components(
397 "d",
398 nanoid::nanoid!().as_bytes(),
399 10,
400 ValueType::Value,
401 ))?;
402
403 let trailer = writer.finish()?.expect("should exist");
404
405 assert_eq!(5, trailer.metadata.seqnos.0);
406 assert_eq!(10, trailer.metadata.seqnos.1);
407
408 Ok(())
409 }
410
411 #[test]
412 fn segment_writer_zero_bpk() -> crate::Result<()> {
413 const ITEM_COUNT: u64 = 100;
414
415 let folder = tempfile::tempdir()?.into_path();
416
417 let segment_id = 532;
418
419 let mut writer = Writer::new(Options {
420 folder,
421 data_block_size: 4_096,
422 index_block_size: 4_096,
423 segment_id,
424 })?
425 .use_bloom_policy(BloomConstructionPolicy::BitsPerKey(0));
426
427 let items = (0u64..ITEM_COUNT).map(|i| {
428 InternalValue::from_components(
429 i.to_be_bytes(),
430 nanoid::nanoid!().as_bytes(),
431 0,
432 ValueType::Value,
433 )
434 });
435
436 for item in items {
437 writer.write(item)?;
438 }
439
440 let trailer = writer.finish()?.expect("should exist");
441
442 assert_eq!(ITEM_COUNT, trailer.metadata.item_count);
443 assert_eq!(ITEM_COUNT, trailer.metadata.key_count);
444 assert_eq!(trailer.offsets.bloom_ptr, BlockOffset(0));
445
446 Ok(())
447 }
448
449 #[test]
450 fn segment_writer_write_read() -> crate::Result<()> {
451 const ITEM_COUNT: u64 = 100;
452
453 let folder = tempfile::tempdir()?.into_path();
454
455 let segment_id = 532;
456
457 let mut writer = Writer::new(Options {
458 folder: folder.clone(),
459 data_block_size: 4_096,
460 index_block_size: 4_096,
461 segment_id,
462 })?;
463
464 let items = (0u64..ITEM_COUNT).map(|i| {
465 InternalValue::from_components(
466 i.to_be_bytes(),
467 nanoid::nanoid!().as_bytes(),
468 0,
469 ValueType::Value,
470 )
471 });
472
473 for item in items {
474 writer.write(item)?;
475 }
476
477 let trailer = writer.finish()?.expect("should exist");
478
479 assert_eq!(ITEM_COUNT, trailer.metadata.item_count);
480 assert_eq!(ITEM_COUNT, trailer.metadata.key_count);
481
482 assert!(*trailer.offsets.bloom_ptr > 0);
483
484 let segment_file_path = folder.join(segment_id.to_string());
485
486 #[allow(clippy::cast_possible_truncation)]
489 {
490 let tli = TopLevelIndex::from_file(
491 &segment_file_path,
492 &trailer.metadata,
493 trailer.offsets.tli_ptr,
494 )?;
495
496 assert_eq!(tli.len() as u32, trailer.metadata.index_block_count);
497 }
498
499 let table = Arc::new(FileDescriptorTable::new(512, 1));
500 table.insert(segment_file_path, (0, segment_id).into());
501
502 let block_cache = Arc::new(Cache::with_capacity_bytes(10 * 1_024 * 1_024));
503
504 let iter = Reader::new(
505 trailer.offsets.index_block_ptr,
506 table,
507 (0, segment_id).into(),
508 block_cache,
509 BlockOffset(0),
510 None,
511 );
512
513 assert_eq!(ITEM_COUNT, iter.count() as u64);
514
515 Ok(())
516 }
517
518 #[test]
519 fn segment_writer_write_read_mvcc() -> crate::Result<()> {
520 const ITEM_COUNT: u64 = 1_000;
521 const VERSION_COUNT: u64 = 5;
522
523 let folder = tempfile::tempdir()?.into_path();
524
525 let segment_id = 532;
526
527 let mut writer = Writer::new(Options {
528 folder: folder.clone(),
529 data_block_size: 4_096,
530 index_block_size: 4_096,
531 segment_id,
532 })?;
533
534 for key in 0u64..ITEM_COUNT {
535 for seqno in (0..VERSION_COUNT).rev() {
536 let value = InternalValue::from_components(
537 key.to_be_bytes(),
538 nanoid::nanoid!().as_bytes(),
539 seqno,
540 ValueType::Value,
541 );
542
543 writer.write(value)?;
544 }
545 }
546
547 let trailer = writer.finish()?.expect("should exist");
548
549 assert_eq!(ITEM_COUNT * VERSION_COUNT, trailer.metadata.item_count);
550 assert_eq!(ITEM_COUNT, trailer.metadata.key_count);
551
552 assert!(*trailer.offsets.bloom_ptr > 0);
553
554 let segment_file_path = folder.join(segment_id.to_string());
555
556 let table = Arc::new(FileDescriptorTable::new(512, 1));
557 table.insert(segment_file_path, (0, segment_id).into());
558
559 let block_cache = Arc::new(Cache::with_capacity_bytes(10 * 1_024 * 1_024));
560
561 let iter = Reader::new(
562 trailer.offsets.index_block_ptr,
563 table,
564 (0, segment_id).into(),
565 block_cache,
566 BlockOffset(0),
567 None,
568 );
569
570 assert_eq!(ITEM_COUNT * VERSION_COUNT, iter.count() as u64);
571
572 Ok(())
573 }
574}