1use std::convert::TryInto;
2use std::num::NonZeroUsize;
3use std::{cmp, io};
4
5use byteorder::{BigEndian, WriteBytesExt};
6
7use crate::block_writer::BlockWriter;
8use crate::compression::{compress, CompressionType};
9use crate::count_write::CountWrite;
10use crate::metadata::{FileVersion, Metadata};
11
12const DEFAULT_BLOCK_SIZE: usize = 8192;
13const MIN_BLOCK_SIZE: usize = 1024;
14
15pub struct WriterBuilder {
17 compression_type: CompressionType,
18 compression_level: u32,
19 index_key_interval: Option<NonZeroUsize>,
20 index_levels: u8,
21 block_size: usize,
22}
23
24impl Default for WriterBuilder {
25 fn default() -> WriterBuilder {
26 WriterBuilder {
27 compression_type: CompressionType::None,
28 compression_level: 0,
29 index_key_interval: None,
30 index_levels: 0,
31 block_size: DEFAULT_BLOCK_SIZE,
32 }
33 }
34}
35
36impl WriterBuilder {
37 pub fn new() -> WriterBuilder {
40 WriterBuilder::default()
41 }
42
43 pub fn compression_type(&mut self, compression_type: CompressionType) -> &mut Self {
45 self.compression_type = compression_type;
46 self
47 }
48
49 pub fn compression_level(&mut self, level: u32) -> &mut Self {
52 self.compression_level = level;
53 self
54 }
55
56 pub fn block_size(&mut self, size: usize) -> &mut Self {
61 self.block_size = cmp::max(MIN_BLOCK_SIZE, size);
62 self
63 }
64
65 pub fn index_key_interval(&mut self, interval: NonZeroUsize) -> &mut Self {
68 self.index_key_interval = Some(interval);
69 self
70 }
71
72 pub fn index_levels(&mut self, levels: u8) -> &mut Self {
81 self.index_levels = levels;
82 self
83 }
84
85 pub fn build<W: io::Write>(&self, writer: W) -> Writer<W> {
87 let mut block_writer_builder = BlockWriter::builder();
88 if let Some(interval) = self.index_key_interval {
89 block_writer_builder.index_key_interval(interval);
90 }
91
92 let mut index_block_writer_builder = BlockWriter::builder();
93 if let Some(interval) = self.index_key_interval {
94 index_block_writer_builder.index_key_interval(interval);
95 }
96 let index_block_writer = index_block_writer_builder.build();
97
98 Writer {
99 block_writer: block_writer_builder.build(),
100 index_block_writers: vec![index_block_writer; self.index_levels as usize + 1],
101 compression_type: self.compression_type,
102 compression_level: self.compression_level,
103 block_size: self.block_size,
104 entries_count: 0,
105 writer: CountWrite::new(writer),
106 }
107 }
108
109 pub fn memory(&self) -> Writer<Vec<u8>> {
111 self.build(Vec::new())
112 }
113}
114
115pub struct Writer<W> {
118 block_writer: BlockWriter,
120 index_block_writers: Vec<BlockWriter>,
123 compression_type: CompressionType,
125 compression_level: u32,
127 block_size: usize,
129 entries_count: u64,
131 writer: CountWrite<W>,
133}
134
135impl Writer<Vec<u8>> {
136 pub fn memory() -> Writer<Vec<u8>> {
138 WriterBuilder::new().memory()
139 }
140}
141
142impl Writer<()> {
143 pub fn builder() -> WriterBuilder {
145 WriterBuilder::default()
146 }
147}
148
149impl<W: io::Write> AsRef<W> for Writer<W> {
150 fn as_ref(&self) -> &W {
152 self.writer.as_ref()
153 }
154}
155
156impl<W: io::Write> Writer<W> {
157 pub fn new(writer: W) -> Writer<W> {
159 WriterBuilder::new().build(writer)
160 }
161
162 pub fn insert<A, B>(&mut self, key: A, val: B) -> io::Result<()>
165 where
166 A: AsRef<[u8]>,
167 B: AsRef<[u8]>,
168 {
169 self.block_writer.insert(key.as_ref(), val.as_ref());
170 self.entries_count += 1;
171
172 if self.block_writer.current_size_estimate() >= self.block_size {
173 if let Some(last_key) = self.block_writer.last_key() {
175 if let Some(index_block_writer) = self.index_block_writers.last_mut() {
176 let offset = self.writer.count();
179 index_block_writer.insert(last_key, &offset.to_be_bytes());
180
181 compress_and_write_block(
182 &mut self.writer,
183 &mut self.block_writer,
184 self.compression_type,
185 self.compression_level,
186 )?;
187 }
188
189 let mut index_block_writers = &mut self.index_block_writers.as_mut_slice()[1..];
193 while let Some((last_block_writer, head)) = index_block_writers.split_last_mut() {
194 if last_block_writer.current_size_estimate() >= self.block_size {
195 if let Some(last_key) = last_block_writer.last_key() {
197 if let Some(index_block_writer) = head.last_mut() {
198 let offset = self.writer.count();
199 index_block_writer.insert(last_key, &offset.to_be_bytes());
200
201 compress_and_write_block(
202 &mut self.writer,
203 last_block_writer,
204 self.compression_type,
205 self.compression_level,
206 )?;
207 }
208 }
209 }
210
211 index_block_writers = head;
212 }
213 }
214 }
215
216 Ok(())
217 }
218
219 pub fn finish(self) -> io::Result<()> {
223 self.into_inner().map(drop)
224 }
225
226 pub fn into_inner(mut self) -> io::Result<W> {
230 if let Some(last_key) = self.block_writer.last_key() {
232 if let Some(index_block_writer) = self.index_block_writers.last_mut() {
233 let offset = self.writer.count();
236 index_block_writer.insert(last_key, &offset.to_be_bytes());
237
238 compress_and_write_block(
239 &mut self.writer,
240 &mut self.block_writer,
241 self.compression_type,
242 self.compression_level,
243 )?;
244 }
245 }
246
247 let mut index_block_offset = self.writer.count();
249 let mut index_block_writers = self.index_block_writers.as_mut_slice();
250 while let Some((last_block_writer, head)) = index_block_writers.split_last_mut() {
251 index_block_offset = self.writer.count();
253
254 match last_block_writer.last_key() {
255 Some(last_key) => {
257 if let Some(pre_last_block_writer) = head.last_mut() {
260 pre_last_block_writer.insert(last_key, &index_block_offset.to_be_bytes());
261 }
262
263 compress_and_write_block(
264 &mut self.writer,
265 last_block_writer,
266 self.compression_type,
267 self.compression_level,
268 )?;
269 }
270 None => {
272 if head.is_empty() {
273 compress_and_write_block(
274 &mut self.writer,
275 last_block_writer,
276 self.compression_type,
277 self.compression_level,
278 )?;
279 }
280 }
281 }
282
283 index_block_writers = head;
284 }
285
286 let metadata = Metadata {
288 file_version: FileVersion::FormatV2,
289 index_block_offset,
290 compression_type: self.compression_type,
291 entries_count: self.entries_count,
292 index_levels: self.index_block_writers.len() as u8 - 1,
293 };
294
295 metadata.write_into(&mut self.writer)?;
296 self.writer.into_inner()
297 }
298}
299
300fn compress_and_write_block<W: io::Write>(
302 mut writer: W,
303 block_writer: &mut BlockWriter,
304 compression_type: CompressionType,
305 compression_level: u32,
306) -> io::Result<()> {
307 let buffer = block_writer.finish();
308
309 let buffer = compress(compression_type, compression_level, buffer.as_ref())?;
311 let block_len = buffer.len().try_into().unwrap();
312 writer.write_u64::<BigEndian>(block_len)?;
313 writer.write_all(&buffer)?;
314
315 Ok(())
316}
317
318#[cfg(test)]
319mod tests {
320 use std::io::Cursor;
321
322 use super::*;
323 use crate::Reader;
324
325 #[test]
326 #[cfg_attr(miri, ignore)]
327 fn no_compression() {
328 let wb = Writer::builder();
329 let mut writer = wb.build(Vec::new());
330
331 for x in 0..2000u32 {
332 let x = x.to_be_bytes();
333 writer.insert(x, x).unwrap();
334 }
335
336 let bytes = writer.into_inner().unwrap();
337 assert_ne!(bytes.len(), 0);
338 }
339
340 #[test]
341 #[cfg_attr(miri, ignore)]
342 fn no_compression_index_levels_2() {
343 let mut wb = Writer::builder();
344 wb.index_levels(2);
345 let mut writer = wb.build(Vec::new());
346
347 for x in 0..2000u32 {
348 let x = x.to_be_bytes();
349 writer.insert(x, x).unwrap();
350 }
351
352 let bytes = writer.into_inner().unwrap();
353 assert_ne!(bytes.len(), 0);
354 }
355
356 #[test]
357 #[cfg_attr(miri, ignore)]
358 #[cfg(feature = "snappy")]
359 fn snappy_compression() {
360 let mut wb = Writer::builder();
361 wb.compression_type(CompressionType::Snappy);
362 let mut writer = wb.build(Vec::new());
363
364 for x in 0..2000u32 {
365 let x = x.to_be_bytes();
366 writer.insert(x, x).unwrap();
367 }
368
369 let bytes = writer.into_inner().unwrap();
370 assert_ne!(bytes.len(), 0);
371 }
372
373 #[test]
374 #[cfg_attr(miri, ignore)]
375 #[cfg(feature = "snappy")]
376 fn backward_compatibility_0_4_snappy_compression() {
377 let mut writer = grenad_0_4::Writer::builder()
378 .compression_type(grenad_0_4::CompressionType::Snappy)
379 .memory();
380
381 let total: u32 = 1_500;
382
383 for x in 0..total {
384 let x = x.to_be_bytes();
385 writer.insert(x, x).unwrap();
386 }
387
388 let bytes = writer.into_inner().unwrap();
389 assert_ne!(bytes.len(), 0);
390
391 let reader = Reader::new(Cursor::new(bytes.as_slice())).unwrap();
392 let mut cursor = reader.into_cursor().unwrap();
393 let mut x = 0u32;
394
395 while let Some((k, v)) = cursor.move_on_next().unwrap() {
396 let k = k.try_into().map(u32::from_be_bytes).unwrap();
397 let v = v.try_into().map(u32::from_be_bytes).unwrap();
398 assert_eq!(k, x);
399 assert_eq!(v, x);
400 x += 1;
401 }
402
403 for x in 0..total {
404 let (k, v) =
405 cursor.move_on_key_greater_than_or_equal_to(x.to_be_bytes()).unwrap().unwrap();
406 let k = k.try_into().map(u32::from_be_bytes).unwrap();
407 let v = v.try_into().map(u32::from_be_bytes).unwrap();
408 assert_eq!(k, x);
409 assert_eq!(v, x);
410 }
411
412 assert_eq!(x, total);
413 }
414}