grenad/
writer.rs

1use std::convert::TryInto;
2use std::num::NonZeroUsize;
3use std::{cmp, io};
4
5use byteorder::{BigEndian, WriteBytesExt};
6
7use crate::block_writer::BlockWriter;
8use crate::compression::{compress, CompressionType};
9use crate::count_write::CountWrite;
10use crate::metadata::{FileVersion, Metadata};
11
12const DEFAULT_BLOCK_SIZE: usize = 8192;
13const MIN_BLOCK_SIZE: usize = 1024;
14
15/// A struct that is used to configure a [`Writer`].
16pub struct WriterBuilder {
17    compression_type: CompressionType,
18    compression_level: u32,
19    index_key_interval: Option<NonZeroUsize>,
20    index_levels: u8,
21    block_size: usize,
22}
23
24impl Default for WriterBuilder {
25    fn default() -> WriterBuilder {
26        WriterBuilder {
27            compression_type: CompressionType::None,
28            compression_level: 0,
29            index_key_interval: None,
30            index_levels: 0,
31            block_size: DEFAULT_BLOCK_SIZE,
32        }
33    }
34}
35
36impl WriterBuilder {
37    /// Creates a [`WriterBuilder`], it can be used to
38    /// configure your [`Writer`] to better fit your needs.
39    pub fn new() -> WriterBuilder {
40        WriterBuilder::default()
41    }
42
43    /// Defines the [`CompressionType`] that will be used to compress the writer blocks.
44    pub fn compression_type(&mut self, compression_type: CompressionType) -> &mut Self {
45        self.compression_type = compression_type;
46        self
47    }
48
49    /// Defines the copression level of the defined [`CompressionType`]
50    /// that will be used to compress the writer blocks.
51    pub fn compression_level(&mut self, level: u32) -> &mut Self {
52        self.compression_level = level;
53        self
54    }
55
56    /// Defines the size of the blocks that the writer will writer.
57    ///
58    /// The bigger the blocks are the better they are compressed
59    /// but the more time it takes to compress and decompress them.
60    pub fn block_size(&mut self, size: usize) -> &mut Self {
61        self.block_size = cmp::max(MIN_BLOCK_SIZE, size);
62        self
63    }
64
65    /// The interval at which we store the index of a key in the
66    /// index footer, used to seek into a block.
67    pub fn index_key_interval(&mut self, interval: NonZeroUsize) -> &mut Self {
68        self.index_key_interval = Some(interval);
69        self
70    }
71
72    /// The number of levels/indirection we will use to write the index footer.
73    ///
74    /// An indirection of 1 or 2 is sufficient to reduce the impact of
75    /// decompressing/reading the index block footer.
76    ///
77    /// The default is 0 which means that the index block footer values directly specifies
78    /// the block where the requested data entries can be found. The disavantage of this
79    /// is that the index block can be quite big and take time to be decompressed and read.
80    pub fn index_levels(&mut self, levels: u8) -> &mut Self {
81        self.index_levels = levels;
82        self
83    }
84
85    /// Creates the [`Writer`] that will write into the provided [`io::Write`] type.
86    pub fn build<W: io::Write>(&self, writer: W) -> Writer<W> {
87        let mut block_writer_builder = BlockWriter::builder();
88        if let Some(interval) = self.index_key_interval {
89            block_writer_builder.index_key_interval(interval);
90        }
91
92        let mut index_block_writer_builder = BlockWriter::builder();
93        if let Some(interval) = self.index_key_interval {
94            index_block_writer_builder.index_key_interval(interval);
95        }
96        let index_block_writer = index_block_writer_builder.build();
97
98        Writer {
99            block_writer: block_writer_builder.build(),
100            index_block_writers: vec![index_block_writer; self.index_levels as usize + 1],
101            compression_type: self.compression_type,
102            compression_level: self.compression_level,
103            block_size: self.block_size,
104            entries_count: 0,
105            writer: CountWrite::new(writer),
106        }
107    }
108
109    /// Creates the [`Writer`] that will write into a [`Vec`] of bytes.
110    pub fn memory(&self) -> Writer<Vec<u8>> {
111        self.build(Vec::new())
112    }
113}
114
115/// A struct you can use to write entries into any [`io::Write`] type,
116/// entries must be inserted in key-order.
117pub struct Writer<W> {
118    /// The block writer that is currently storing the key/values entries.
119    block_writer: BlockWriter,
120    /// The block writers that associates the offset (big endian u64) of the
121    /// blocks in the file with the last key of these given blocks.
122    index_block_writers: Vec<BlockWriter>,
123    /// The compression method used to compress individual blocks.
124    compression_type: CompressionType,
125    /// The compression level used to compress individual blocks.
126    compression_level: u32,
127    /// The amount of bytes to reach before dumping this block on disk.
128    block_size: usize,
129    /// The amount of key already inserted.
130    entries_count: u64,
131    /// The writer in which we write the block, index footer blocks and footer metadata.
132    writer: CountWrite<W>,
133}
134
135impl Writer<Vec<u8>> {
136    /// Creates a [`Writer`] that will write into a [`Vec`] of bytes.
137    pub fn memory() -> Writer<Vec<u8>> {
138        WriterBuilder::new().memory()
139    }
140}
141
142impl Writer<()> {
143    /// Creates a [`WriterBuilder`], it can be used to configure your [`Writer`].
144    pub fn builder() -> WriterBuilder {
145        WriterBuilder::default()
146    }
147}
148
149impl<W: io::Write> AsRef<W> for Writer<W> {
150    /// Gets a reference to the underlying writer.
151    fn as_ref(&self) -> &W {
152        self.writer.as_ref()
153    }
154}
155
156impl<W: io::Write> Writer<W> {
157    /// Creates a [`Writer`] that will write into the provided [`io::Write`] type.
158    pub fn new(writer: W) -> Writer<W> {
159        WriterBuilder::new().build(writer)
160    }
161
162    /// Writes the provided entry into the underlying [`io::Write`] type,
163    /// key-values must be given in key-order.
164    pub fn insert<A, B>(&mut self, key: A, val: B) -> io::Result<()>
165    where
166        A: AsRef<[u8]>,
167        B: AsRef<[u8]>,
168    {
169        self.block_writer.insert(key.as_ref(), val.as_ref());
170        self.entries_count += 1;
171
172        if self.block_writer.current_size_estimate() >= self.block_size {
173            // Only write a block if there is at least a key in it.
174            if let Some(last_key) = self.block_writer.last_key() {
175                if let Some(index_block_writer) = self.index_block_writers.last_mut() {
176                    // Get the current offset and last key of the current block,
177                    // write it in the index block writer.
178                    let offset = self.writer.count();
179                    index_block_writer.insert(last_key, &offset.to_be_bytes());
180
181                    compress_and_write_block(
182                        &mut self.writer,
183                        &mut self.block_writer,
184                        self.compression_type,
185                        self.compression_level,
186                    )?;
187                }
188
189                // We iterate recursively on the index blocks and dumps the blocks that reached
190                // the size limit, saving the offsets in the parent block. We skip the first index
191                // block as it is the main one and must only be dumped at the end.
192                let mut index_block_writers = &mut self.index_block_writers.as_mut_slice()[1..];
193                while let Some((last_block_writer, head)) = index_block_writers.split_last_mut() {
194                    if last_block_writer.current_size_estimate() >= self.block_size {
195                        // Only write a block if there is at least a key in it.
196                        if let Some(last_key) = last_block_writer.last_key() {
197                            if let Some(index_block_writer) = head.last_mut() {
198                                let offset = self.writer.count();
199                                index_block_writer.insert(last_key, &offset.to_be_bytes());
200
201                                compress_and_write_block(
202                                    &mut self.writer,
203                                    last_block_writer,
204                                    self.compression_type,
205                                    self.compression_level,
206                                )?;
207                            }
208                        }
209                    }
210
211                    index_block_writers = head;
212                }
213            }
214        }
215
216        Ok(())
217    }
218
219    /// Consumes this [`Writer`] and write the latest block currently being built.
220    ///
221    /// You must call this method before using the underlying [`io::Write`] type.
222    pub fn finish(self) -> io::Result<()> {
223        self.into_inner().map(drop)
224    }
225
226    /// Consumes this [`Writer`] and write the latest block currenty being built.
227    ///
228    /// Returns the underlying [`io::Write`] provided type.
229    pub fn into_inner(mut self) -> io::Result<W> {
230        // Write the last block only if it is not empty.
231        if let Some(last_key) = self.block_writer.last_key() {
232            if let Some(index_block_writer) = self.index_block_writers.last_mut() {
233                // Get the current offset and last key of the current block,
234                // write it in the index block writer.
235                let offset = self.writer.count();
236                index_block_writer.insert(last_key, &offset.to_be_bytes());
237
238                compress_and_write_block(
239                    &mut self.writer,
240                    &mut self.block_writer,
241                    self.compression_type,
242                    self.compression_level,
243                )?;
244            }
245        }
246
247        // We must write the index block levels to the file.
248        let mut index_block_offset = self.writer.count();
249        let mut index_block_writers = self.index_block_writers.as_mut_slice();
250        while let Some((last_block_writer, head)) = index_block_writers.split_last_mut() {
251            // Get the offset where we are in the file.
252            index_block_offset = self.writer.count();
253
254            match last_block_writer.last_key() {
255                // Write the index block only if it is not empty.
256                Some(last_key) => {
257                    // Get the last_key of the index block we are writing and
258                    // put that last_key into the index block of the level above.
259                    if let Some(pre_last_block_writer) = head.last_mut() {
260                        pre_last_block_writer.insert(last_key, &index_block_offset.to_be_bytes());
261                    }
262
263                    compress_and_write_block(
264                        &mut self.writer,
265                        last_block_writer,
266                        self.compression_type,
267                        self.compression_level,
268                    )?;
269                }
270                // Or if this is the main index block.
271                None => {
272                    if head.is_empty() {
273                        compress_and_write_block(
274                            &mut self.writer,
275                            last_block_writer,
276                            self.compression_type,
277                            self.compression_level,
278                        )?;
279                    }
280                }
281            }
282
283            index_block_writers = head;
284        }
285
286        // Then we can write the metadata that specifies where the index block is stored.
287        let metadata = Metadata {
288            file_version: FileVersion::FormatV2,
289            index_block_offset,
290            compression_type: self.compression_type,
291            entries_count: self.entries_count,
292            index_levels: self.index_block_writers.len() as u8 - 1,
293        };
294
295        metadata.write_into(&mut self.writer)?;
296        self.writer.into_inner()
297    }
298}
299
300/// Compress and write the block into the writer prefixed by the length of it as an `u64`.
301fn compress_and_write_block<W: io::Write>(
302    mut writer: W,
303    block_writer: &mut BlockWriter,
304    compression_type: CompressionType,
305    compression_level: u32,
306) -> io::Result<()> {
307    let buffer = block_writer.finish();
308
309    // Compress, write the length of the compressed block then the block itself.
310    let buffer = compress(compression_type, compression_level, buffer.as_ref())?;
311    let block_len = buffer.len().try_into().unwrap();
312    writer.write_u64::<BigEndian>(block_len)?;
313    writer.write_all(&buffer)?;
314
315    Ok(())
316}
317
318#[cfg(test)]
319mod tests {
320    use std::io::Cursor;
321
322    use super::*;
323    use crate::Reader;
324
325    #[test]
326    #[cfg_attr(miri, ignore)]
327    fn no_compression() {
328        let wb = Writer::builder();
329        let mut writer = wb.build(Vec::new());
330
331        for x in 0..2000u32 {
332            let x = x.to_be_bytes();
333            writer.insert(x, x).unwrap();
334        }
335
336        let bytes = writer.into_inner().unwrap();
337        assert_ne!(bytes.len(), 0);
338    }
339
340    #[test]
341    #[cfg_attr(miri, ignore)]
342    fn no_compression_index_levels_2() {
343        let mut wb = Writer::builder();
344        wb.index_levels(2);
345        let mut writer = wb.build(Vec::new());
346
347        for x in 0..2000u32 {
348            let x = x.to_be_bytes();
349            writer.insert(x, x).unwrap();
350        }
351
352        let bytes = writer.into_inner().unwrap();
353        assert_ne!(bytes.len(), 0);
354    }
355
356    #[test]
357    #[cfg_attr(miri, ignore)]
358    #[cfg(feature = "snappy")]
359    fn snappy_compression() {
360        let mut wb = Writer::builder();
361        wb.compression_type(CompressionType::Snappy);
362        let mut writer = wb.build(Vec::new());
363
364        for x in 0..2000u32 {
365            let x = x.to_be_bytes();
366            writer.insert(x, x).unwrap();
367        }
368
369        let bytes = writer.into_inner().unwrap();
370        assert_ne!(bytes.len(), 0);
371    }
372
373    #[test]
374    #[cfg_attr(miri, ignore)]
375    #[cfg(feature = "snappy")]
376    fn backward_compatibility_0_4_snappy_compression() {
377        let mut writer = grenad_0_4::Writer::builder()
378            .compression_type(grenad_0_4::CompressionType::Snappy)
379            .memory();
380
381        let total: u32 = 1_500;
382
383        for x in 0..total {
384            let x = x.to_be_bytes();
385            writer.insert(x, x).unwrap();
386        }
387
388        let bytes = writer.into_inner().unwrap();
389        assert_ne!(bytes.len(), 0);
390
391        let reader = Reader::new(Cursor::new(bytes.as_slice())).unwrap();
392        let mut cursor = reader.into_cursor().unwrap();
393        let mut x = 0u32;
394
395        while let Some((k, v)) = cursor.move_on_next().unwrap() {
396            let k = k.try_into().map(u32::from_be_bytes).unwrap();
397            let v = v.try_into().map(u32::from_be_bytes).unwrap();
398            assert_eq!(k, x);
399            assert_eq!(v, x);
400            x += 1;
401        }
402
403        for x in 0..total {
404            let (k, v) =
405                cursor.move_on_key_greater_than_or_equal_to(x.to_be_bytes()).unwrap().unwrap();
406            let k = k.try_into().map(u32::from_be_bytes).unwrap();
407            let v = v.try_into().map(u32::from_be_bytes).unwrap();
408            assert_eq!(k, x);
409            assert_eq!(v, x);
410        }
411
412        assert_eq!(x, total);
413    }
414}