1use crate::block_builder::{DataBlockBuilder, DataBlockBuilderOptions, IndexBlockBuilder};
2use crate::block_handle::BlockHandle;
3use crate::error::{Error, Result};
4use crate::footer::Footer;
5use crate::types::{CompressionType, FormatVersion, WriteOptions};
6use byteorder::{LittleEndian, WriteBytesExt};
7use std::fs::File;
8use std::io::{BufWriter, Write};
9use std::path::Path;
10
11#[derive(Debug, Clone, PartialEq, Eq)]
13pub enum EntryType {
14 Put,
15 Delete,
16 Merge,
17}
18
19pub struct SstFileWriter {
21 options: WriteOptions,
22 writer: Option<BufWriter<File>>,
23 data_block_builder: DataBlockBuilder,
24 index_block_builder: IndexBlockBuilder,
25 offset: u64,
26 num_entries: u64,
27 last_key: Vec<u8>,
28 finished: bool,
29 pending_index_entry: Option<(Vec<u8>, BlockHandle)>,
30 base_context_checksum: Option<u32>,
31}
32
33impl SstFileWriter {
34 pub fn create(opts: &WriteOptions) -> Self {
36 let base_context_checksum = if opts.format_version >= FormatVersion::V6 {
38 Some(0) } else {
40 None
41 };
42
43 SstFileWriter {
44 options: opts.clone(),
45 writer: None,
46 data_block_builder: DataBlockBuilder::new(
47 DataBlockBuilderOptions::default()
48 .with_restart_interval(opts.block_restart_interval),
49 ),
50 index_block_builder: IndexBlockBuilder::new(opts.block_restart_interval),
51 offset: 0,
52 num_entries: 0,
53 last_key: Vec::new(),
54 finished: false,
55 pending_index_entry: None,
56 base_context_checksum,
57 }
58 }
59
60 pub fn open<P: AsRef<Path>>(&mut self, path: P) -> Result<()> {
62 if self.writer.is_some() {
63 return Err(Error::InvalidArgument("File already open".to_string()));
64 }
65
66 let file = File::create(path)?;
67 self.writer = Some(BufWriter::new(file));
68 self.offset = 0;
69 self.num_entries = 0;
70 self.last_key.clear();
71 self.finished = false;
72
73 Ok(())
74 }
75
76 pub fn put<K, V>(&mut self, key: K, value: V) -> Result<()>
78 where
79 K: AsRef<[u8]>,
80 V: AsRef<[u8]>,
81 {
82 self.add_entry(key.as_ref(), value.as_ref(), EntryType::Put)
83 }
84
85 pub fn merge<K, V>(&mut self, key: K, value: V) -> Result<()>
87 where
88 K: AsRef<[u8]>,
89 V: AsRef<[u8]>,
90 {
91 self.add_entry(key.as_ref(), value.as_ref(), EntryType::Merge)
92 }
93
94 pub fn delete<K: AsRef<[u8]>>(&mut self, key: K) -> Result<()> {
96 self.add_entry(key.as_ref(), &[], EntryType::Delete)
97 }
98
99 pub fn finish(&mut self) -> Result<()> {
101 if self.finished {
102 return Err(Error::InvalidArgument("Already finished".to_string()));
103 }
104
105 if self.writer.is_none() {
106 return Err(Error::InvalidArgument("No file open".to_string()));
107 }
108
109 if !self.data_block_builder.empty() {
111 self.flush_data_block()?;
112 }
113
114 let index_block_data = self.index_block_builder.finish(
116 CompressionType::None,
117 self.options.checksum_type,
118 Some(self.offset),
119 self.base_context_checksum,
120 )?;
121 let index_handle = BlockHandle {
122 offset: self.offset,
123 size: index_block_data.len() as u64,
124 };
125
126 let metaindex_offset = self.offset + index_block_data.len() as u64;
127 let metaindex_data = self.create_empty_metaindex_block(metaindex_offset)?;
128 let metaindex_handle = BlockHandle {
129 offset: metaindex_offset,
130 size: metaindex_data.len() as u64,
131 };
132
133 let footer = Footer {
134 checksum_type: self.options.checksum_type,
135 metaindex_handle,
136 index_handle,
137 format_version: self.options.format_version as u32,
138 base_context_checksum: self.base_context_checksum,
139 };
140 let footer_offset =
142 self.offset + index_block_data.len() as u64 + metaindex_data.len() as u64;
143 let footer_data = footer.encode_to_bytes(footer_offset)?;
144
145 let writer = self.writer.as_mut().unwrap();
147 writer.write_all(&index_block_data)?;
148 self.offset += index_block_data.len() as u64;
149
150 writer.write_all(&metaindex_data)?;
151 self.offset += metaindex_data.len() as u64;
152
153 writer.write_all(&footer_data)?;
154
155 writer.flush()?;
156 self.finished = true;
157
158 Ok(())
159 }
160
161 pub fn file_size(&self) -> u64 {
163 self.offset
164 }
165
166 fn add_entry(&mut self, key: &[u8], value: &[u8], entry_type: EntryType) -> Result<()> {
167 if self.finished {
168 return Err(Error::InvalidArgument("Writer is finished".to_string()));
169 }
170
171 if self.writer.is_none() {
172 return Err(Error::InvalidArgument("No file open".to_string()));
173 }
174
175 if !self.last_key.is_empty() && key <= self.last_key.as_slice() {
177 return Err(Error::InvalidArgument(
178 "Keys must be added in strictly increasing order".to_string(),
179 ));
180 }
181
182 if self.data_block_builder.size_estimate() >= self.options.block_size
184 && !self.data_block_builder.empty()
185 {
186 self.flush_data_block()?;
187 }
188
189 let encoded_value = self.encode_entry_value(value, entry_type);
191
192 self.data_block_builder.add(key, &encoded_value);
194
195 self.last_key.clear();
196 self.last_key.extend_from_slice(key);
197 self.num_entries += 1;
198
199 Ok(())
200 }
201
202 fn flush_data_block(&mut self) -> Result<()> {
203 if self.data_block_builder.empty() {
204 return Ok(());
205 }
206
207 let writer = self.writer.as_mut().unwrap();
208
209 let block_data = self.data_block_builder.finish(
211 self.options.compression,
212 self.options.checksum_type,
213 Some(self.offset),
214 self.base_context_checksum,
215 )?;
216
217 let block_handle = BlockHandle {
219 offset: self.offset,
220 size: block_data.len() as u64,
221 };
222
223 writer.write_all(&block_data)?;
225 self.offset += block_data.len() as u64;
226
227 if let Some((prev_key, prev_handle)) = self.pending_index_entry.take() {
229 self.index_block_builder
230 .add_index_entry(&prev_key, &prev_handle);
231 }
232
233 self.pending_index_entry = Some((self.last_key.clone(), block_handle));
235
236 self.data_block_builder.reset();
238
239 Ok(())
240 }
241
242 fn encode_entry_value(&self, value: &[u8], entry_type: EntryType) -> Vec<u8> {
243 let mut encoded = Vec::with_capacity(value.len() + 1);
246 encoded.push(entry_type as u8);
247 encoded.extend_from_slice(value);
248 encoded
249 }
250
251 fn create_empty_metaindex_block(&self, file_offset: u64) -> Result<Vec<u8>> {
252 let mut block_data = Vec::new();
254
255 block_data.write_u32::<LittleEndian>(0)?; block_data.write_u32::<LittleEndian>(1)?; let mut checksum_data = block_data.clone();
261 checksum_data.push(CompressionType::None as u8);
262 let mut checksum = self.options.checksum_type.calculate(&checksum_data);
263
264 if let Some(base_checksum) = self.base_context_checksum {
266 let modifier = crate::types::checksum_modifier_for_context(base_checksum, file_offset);
267 checksum = checksum.wrapping_add(modifier);
268 }
269
270 block_data.push(CompressionType::None as u8);
272 block_data.write_u32::<LittleEndian>(checksum)?;
273
274 Ok(block_data)
275 }
276}
277
278impl Drop for SstFileWriter {
279 fn drop(&mut self) {
280 if !self.finished && self.writer.is_some() {
281 let _ = self.finish();
283 }
284 }
285}
286
287#[cfg(test)]
288mod tests {
289 use super::*;
290 use crate::error::Error;
291 use crate::sst_reader::SstReader;
292 use crate::types::{ChecksumType, CompressionType, FormatVersion};
293 use tempfile::tempdir;
294
295 #[test]
296 fn test_create_writer() -> Result<()> {
297 let opts = WriteOptions::default();
298 let writer = SstFileWriter::create(&opts);
299 assert_eq!(writer.file_size(), 0);
300 Ok(())
301 }
302
303 #[test]
304 fn test_write_and_read_simple() -> Result<()> {
305 let dir =
306 tempdir().map_err(|e| Error::InvalidArgument(format!("Temp dir failed: {}", e)))?;
307 let path = dir.path().join("test.sst");
308
309 let opts = WriteOptions {
310 compression: CompressionType::None,
311 block_size: 4096,
312 block_restart_interval: 16,
313 format_version: FormatVersion::V5,
314 checksum_type: ChecksumType::CRC32c,
315 };
316
317 {
319 let mut writer = SstFileWriter::create(&opts);
320 writer.open(&path)?;
321 writer.put(b"key1", b"value1")?;
322 writer.put(b"key2", b"value2")?;
323 writer.put(b"key3", b"value3")?;
324 writer.finish()?;
325 }
326
327 let reader = SstReader::open(&path)?;
329
330 let footer = reader.get_footer();
331 assert!(footer.index_handle.size > 0);
332 assert_eq!(footer.checksum_type, ChecksumType::CRC32c);
333
334 Ok(())
335 }
336
337 #[test]
338 fn test_key_ordering_enforced() -> Result<()> {
339 let dir =
340 tempdir().map_err(|e| Error::InvalidArgument(format!("Temp dir failed: {}", e)))?;
341 let path = dir.path().join("test.sst");
342
343 let opts = WriteOptions::default();
344 let mut writer = SstFileWriter::create(&opts);
345 writer.open(&path)?;
346
347 writer.put(b"key2", b"value2")?;
348
349 let result = writer.put(b"key1", b"value1");
351 assert!(result.is_err());
352 Ok(())
353 }
354
355 #[test]
356 fn test_different_operations() -> Result<()> {
357 let dir =
358 tempdir().map_err(|e| Error::InvalidArgument(format!("Temp dir failed: {}", e)))?;
359 let path = dir.path().join("test.sst");
360
361 let opts = WriteOptions::default();
362 let mut writer = SstFileWriter::create(&opts);
363 writer.open(&path)?;
364
365 writer.put(b"key1", b"value1")?;
366 writer.delete(b"key2")?;
367 writer.merge(b"key3", b"merge_value")?;
368 writer.finish()?;
369
370 assert!(writer.file_size() > 0);
371 Ok(())
372 }
373
374 #[test]
375 fn test_compression() -> Result<()> {
376 let dir =
377 tempdir().map_err(|e| Error::InvalidArgument(format!("Temp dir failed: {}", e)))?;
378 let path = dir.path().join("test.sst");
379
380 let opts = WriteOptions {
381 compression: CompressionType::Snappy,
382 block_size: 1024, block_restart_interval: 16,
384 format_version: FormatVersion::V5,
385 checksum_type: ChecksumType::CRC32c,
386 };
387
388 let mut writer = SstFileWriter::create(&opts);
389 writer.open(&path)?;
390
391 for i in 0..100 {
393 let key = format!("key{:03}", i);
394 let value = format!("value{:03}_some_long_repeated_data", i);
395 writer.put(key.as_bytes(), value.as_bytes())?;
396 }
397
398 writer.finish()?;
399 assert!(writer.file_size() > 0);
400 Ok(())
401 }
402
403 #[test]
404 fn test_empty_file() -> Result<()> {
405 let dir =
406 tempdir().map_err(|e| Error::InvalidArgument(format!("Temp dir failed: {}", e)))?;
407 let path = dir.path().join("empty.sst");
408
409 let opts = WriteOptions::default();
410 let mut writer = SstFileWriter::create(&opts);
411 writer.open(&path)?;
412 writer.finish()?;
413
414 assert!(writer.file_size() > 0); Ok(())
417 }
418
419 #[test]
420 fn test_file_not_open() -> Result<()> {
421 let opts = WriteOptions::default();
422 let mut writer = SstFileWriter::create(&opts);
423
424 let result = writer.put(b"key1", b"value1");
426 assert!(result.is_err());
427 Ok(())
428 }
429
430 #[test]
431 fn test_already_finished() -> Result<()> {
432 let dir =
433 tempdir().map_err(|e| Error::InvalidArgument(format!("Temp dir failed: {}", e)))?;
434 let path = dir.path().join("test.sst");
435
436 let opts = WriteOptions::default();
437 let mut writer = SstFileWriter::create(&opts);
438 writer.open(&path)?;
439 writer.finish()?;
440
441 let result = writer.put(b"key1", b"value1");
443 assert!(result.is_err());
444
445 let result = writer.finish();
447 assert!(result.is_err());
448 Ok(())
449 }
450
451 #[test]
452 fn test_rountrip_v5_xxh3() -> Result<()> {
453 let dir =
454 tempdir().map_err(|e| Error::InvalidArgument(format!("Temp dir failed: {}", e)))?;
455 let path = dir.path().join("checksum_test_v5_xxh3.sst");
456
457 let opts = WriteOptions {
459 compression: CompressionType::None,
460 block_size: 4096,
461 block_restart_interval: 16,
462 format_version: FormatVersion::V5,
463 checksum_type: ChecksumType::XXH3,
464 };
465
466 {
468 let mut writer = SstFileWriter::create(&opts);
469 writer.open(&path)?;
470 writer.put(b"key1", b"value1")?;
471 writer.put(b"key2", b"value2")?;
472 writer.finish()?;
473 }
474
475 let reader = SstReader::open(&path)?;
477 let footer = reader.get_footer();
478 assert_eq!(footer.checksum_type, ChecksumType::XXH3);
479 Ok(())
480 }
481
482 #[test]
483 fn test_rountrip_v6_xxh3() -> Result<()> {
484 let dir =
485 tempdir().map_err(|e| Error::InvalidArgument(format!("Temp dir failed: {}", e)))?;
486 let path = dir.path().join("checksum_test_v6_xxh3.sst");
487
488 let opts = WriteOptions {
490 compression: CompressionType::None,
491 block_size: 4096,
492 block_restart_interval: 16,
493 format_version: FormatVersion::V6,
494 checksum_type: ChecksumType::XXH3,
495 };
496
497 {
499 let mut writer = SstFileWriter::create(&opts);
500 writer.open(&path)?;
501 writer.put(b"key1", b"value1")?;
502 writer.put(b"key2", b"value2")?;
503 writer.finish()?;
504 }
505
506 let reader = SstReader::open(&path)?;
508 let footer = reader.get_footer();
509 assert_eq!(footer.checksum_type, ChecksumType::XXH3);
510 Ok(())
511 }
512
513 #[test]
514 fn test_rountrip_v7_xxh3() -> Result<()> {
515 let dir =
516 tempdir().map_err(|e| Error::InvalidArgument(format!("Temp dir failed: {}", e)))?;
517 let path = dir.path().join("checksum_test_v7_xxh3.sst");
518
519 let opts = WriteOptions {
521 compression: CompressionType::None,
522 block_size: 4096,
523 block_restart_interval: 16,
524 format_version: FormatVersion::V7,
525 checksum_type: ChecksumType::XXH3,
526 };
527
528 {
530 let mut writer = SstFileWriter::create(&opts);
531 writer.open(&path)?;
532 writer.put(b"key1", b"value1")?;
533 writer.put(b"key2", b"value2")?;
534 writer.finish()?;
535 }
536
537 let reader = SstReader::open(&path)?;
539 let footer = reader.get_footer();
540 assert_eq!(footer.checksum_type, ChecksumType::XXH3);
541 Ok(())
542 }
543}