1use std::io::{self, Write};
2use std::ops::Range;
3use std::usize;
4
5use merge::ValueMerger;
6
7mod delta;
8mod dictionary;
9pub mod merge;
10mod streamer;
11pub mod value;
12
13mod sstable_index_v3;
14pub use sstable_index_v3::{BlockAddr, SSTableIndex, SSTableIndexBuilder, SSTableIndexV3};
15mod sstable_index_v2;
16pub(crate) mod vint;
17pub use dictionary::Dictionary;
18pub use streamer::{Streamer, StreamerBuilder};
19
20mod block_reader;
21use common::{BinarySerializable, OwnedBytes};
22
23pub use self::block_reader::BlockReader;
24pub use self::delta::{DeltaReader, DeltaWriter};
25pub use self::merge::VoidMerge;
26use self::value::{U64MonotonicValueReader, U64MonotonicValueWriter, ValueReader, ValueWriter};
27use crate::value::{RangeValueReader, RangeValueWriter};
28
29pub type TermOrdinal = u64;
30
31const DEFAULT_KEY_CAPACITY: usize = 50;
32const SSTABLE_VERSION: u32 = 3;
33
34fn common_prefix_len(left: &[u8], right: &[u8]) -> usize {
37 left.iter()
38 .cloned()
39 .zip(right.iter().cloned())
40 .take_while(|(left, right)| left == right)
41 .count()
42}
43
44#[derive(Debug, Copy, Clone)]
45pub struct SSTableDataCorruption;
46
47pub trait SSTable: Sized {
50 type Value: Clone;
51 type ValueReader: ValueReader<Value = Self::Value>;
52 type ValueWriter: ValueWriter<Value = Self::Value>;
53
54 fn delta_writer<W: io::Write>(write: W) -> DeltaWriter<W, Self::ValueWriter> {
55 DeltaWriter::new(write)
56 }
57
58 fn writer<W: io::Write>(wrt: W) -> Writer<W, Self::ValueWriter> {
59 Writer::new(wrt)
60 }
61
62 fn delta_reader(reader: OwnedBytes) -> DeltaReader<Self::ValueReader> {
63 DeltaReader::new(reader)
64 }
65
66 fn reader(reader: OwnedBytes) -> Reader<Self::ValueReader> {
67 Reader {
68 key: Vec::with_capacity(DEFAULT_KEY_CAPACITY),
69 delta_reader: Self::delta_reader(reader),
70 }
71 }
72
73 fn create_empty_reader() -> Reader<Self::ValueReader> {
75 Self::reader(OwnedBytes::empty())
76 }
77
78 fn merge<W: io::Write, M: ValueMerger<Self::Value>>(
79 io_readers: Vec<OwnedBytes>,
80 w: W,
81 merger: M,
82 ) -> io::Result<()> {
83 let readers: Vec<_> = io_readers.into_iter().map(Self::reader).collect();
84 let writer = Self::writer(w);
85 merge::merge_sstable::<Self, _, _>(readers, writer, merger)
86 }
87}
88
89#[allow(dead_code)]
90pub struct VoidSSTable;
91
92impl SSTable for VoidSSTable {
93 type Value = ();
94 type ValueReader = value::VoidValueReader;
95 type ValueWriter = value::VoidValueWriter;
96}
97
98#[allow(dead_code)]
105pub struct MonotonicU64SSTable;
106
107impl SSTable for MonotonicU64SSTable {
108 type Value = u64;
109
110 type ValueReader = U64MonotonicValueReader;
111
112 type ValueWriter = U64MonotonicValueWriter;
113}
114
115#[derive(Clone, Copy, Debug)]
125pub struct RangeSSTable;
126
127impl SSTable for RangeSSTable {
128 type Value = Range<u64>;
129
130 type ValueReader = RangeValueReader;
131
132 type ValueWriter = RangeValueWriter;
133}
134
135pub struct Reader<TValueReader> {
137 key: Vec<u8>,
138 delta_reader: DeltaReader<TValueReader>,
139}
140
141impl<TValueReader> Reader<TValueReader>
142where TValueReader: ValueReader
143{
144 pub fn advance(&mut self) -> io::Result<bool> {
145 if !self.delta_reader.advance()? {
146 return Ok(false);
147 }
148 let common_prefix_len = self.delta_reader.common_prefix_len();
149 let suffix = self.delta_reader.suffix();
150 let new_len = self.delta_reader.common_prefix_len() + suffix.len();
151 self.key.resize(new_len, 0u8);
152 self.key[common_prefix_len..].copy_from_slice(suffix);
153 Ok(true)
154 }
155
156 #[inline(always)]
157 pub fn key(&self) -> &[u8] {
158 &self.key
159 }
160
161 #[inline(always)]
162 pub fn value(&self) -> &TValueReader::Value {
163 self.delta_reader.value()
164 }
165}
166
167impl<TValueReader> AsRef<[u8]> for Reader<TValueReader> {
168 #[inline(always)]
169 fn as_ref(&self) -> &[u8] {
170 &self.key
171 }
172}
173
174pub struct Writer<W, TValueWriter>
175where W: io::Write
176{
177 previous_key: Vec<u8>,
178 index_builder: SSTableIndexBuilder,
179 delta_writer: DeltaWriter<W, TValueWriter>,
180 num_terms: u64,
181 first_ordinal_of_the_block: u64,
182}
183
184impl<W, TValueWriter> Writer<W, TValueWriter>
185where
186 W: io::Write,
187 TValueWriter: value::ValueWriter,
188{
189 #[doc(hidden)]
193 pub fn create(wrt: W) -> io::Result<Self> {
194 Ok(Self::new(wrt))
195 }
196
197 pub fn new(wrt: W) -> Self {
199 Writer {
200 previous_key: Vec::with_capacity(DEFAULT_KEY_CAPACITY),
201 num_terms: 0u64,
202 index_builder: SSTableIndexBuilder::default(),
203 delta_writer: DeltaWriter::new(wrt),
204 first_ordinal_of_the_block: 0u64,
205 }
206 }
207
208 pub fn set_block_len(&mut self, block_len: usize) {
213 self.delta_writer.set_block_len(block_len)
214 }
215
216 #[inline(always)]
220 pub(crate) fn last_inserted_key(&self) -> &[u8] {
221 &self.previous_key[..]
222 }
223
224 #[inline]
231 pub fn insert<K: AsRef<[u8]>>(
232 &mut self,
233 key: K,
234 value: &TValueWriter::Value,
235 ) -> io::Result<()> {
236 self.insert_key(key.as_ref())?;
237 self.insert_value(value)?;
238 Ok(())
239 }
240
241 #[doc(hidden)]
245 #[inline]
246 pub fn insert_key(&mut self, key: &[u8]) -> io::Result<()> {
247 if self.first_ordinal_of_the_block == self.num_terms {
250 self.index_builder
251 .shorten_last_block_key_given_next_key(key);
252 }
253 let keep_len = common_prefix_len(&self.previous_key, key);
254 let add_len = key.len() - keep_len;
255 let increasing_keys = add_len > 0 && (self.previous_key.len() == keep_len)
256 || self.previous_key.is_empty()
257 || self.previous_key[keep_len] < key[keep_len];
258 assert!(
259 increasing_keys,
260 "Keys should be increasing. ({:?} > {key:?})",
261 self.previous_key
262 );
263 self.previous_key.resize(key.len(), 0u8);
264 self.previous_key[keep_len..].copy_from_slice(&key[keep_len..]);
265 self.delta_writer.write_suffix(keep_len, &key[keep_len..]);
266 Ok(())
267 }
268
269 #[doc(hidden)]
273 #[inline]
274 pub fn insert_value(&mut self, value: &TValueWriter::Value) -> io::Result<()> {
275 self.delta_writer.write_value(value);
276 self.num_terms += 1u64;
277 self.flush_block_if_required()
278 }
279
280 pub fn flush_block_if_required(&mut self) -> io::Result<()> {
281 if let Some(byte_range) = self.delta_writer.flush_block_if_required()? {
282 self.index_builder.add_block(
283 &self.previous_key[..],
284 byte_range,
285 self.first_ordinal_of_the_block,
286 );
287 self.first_ordinal_of_the_block = self.num_terms;
288 self.previous_key.clear();
289 }
290 Ok(())
291 }
292
293 pub fn finish(mut self) -> io::Result<W> {
294 if let Some(byte_range) = self.delta_writer.flush_block()? {
295 self.index_builder.add_block(
296 &self.previous_key[..],
297 byte_range,
298 self.first_ordinal_of_the_block,
299 );
300 self.first_ordinal_of_the_block = self.num_terms;
301 }
302 let mut wrt = self.delta_writer.finish();
303 wrt.write_all(&0u32.to_le_bytes())?;
305
306 let offset = wrt.written_bytes();
307
308 let fst_len: u64 = self.index_builder.serialize(&mut wrt)?;
309 wrt.write_all(&fst_len.to_le_bytes())?;
310 wrt.write_all(&offset.to_le_bytes())?;
311 wrt.write_all(&self.num_terms.to_le_bytes())?;
312
313 SSTABLE_VERSION.serialize(&mut wrt)?;
314
315 let wrt = wrt.finish();
316 Ok(wrt.into_inner()?)
317 }
318}
319
320#[cfg(test)]
321mod test {
322 use std::io;
323 use std::ops::Bound;
324
325 use common::OwnedBytes;
326
327 use super::{common_prefix_len, MonotonicU64SSTable, SSTable, VoidMerge, VoidSSTable};
328
329 fn aux_test_common_prefix_len(left: &str, right: &str, expect_len: usize) {
330 assert_eq!(
331 common_prefix_len(left.as_bytes(), right.as_bytes()),
332 expect_len
333 );
334 assert_eq!(
335 common_prefix_len(right.as_bytes(), left.as_bytes()),
336 expect_len
337 );
338 }
339
340 #[test]
341 fn test_common_prefix_len() {
342 aux_test_common_prefix_len("a", "ab", 1);
343 aux_test_common_prefix_len("", "ab", 0);
344 aux_test_common_prefix_len("ab", "abc", 2);
345 aux_test_common_prefix_len("abde", "abce", 2);
346 }
347
348 #[test]
349 fn test_long_key_diff() {
350 let long_key = (0..1_024).map(|x| (x % 255) as u8).collect::<Vec<_>>();
351 let long_key2 = (1..300).map(|x| (x % 255) as u8).collect::<Vec<_>>();
352 let mut buffer = vec![];
353 {
354 let mut sstable_writer = VoidSSTable::writer(&mut buffer);
355 assert!(sstable_writer.insert(&long_key[..], &()).is_ok());
356 assert!(sstable_writer.insert([0, 3, 4], &()).is_ok());
357 assert!(sstable_writer.insert(&long_key2[..], &()).is_ok());
358 assert!(sstable_writer.finish().is_ok());
359 }
360 let buffer = OwnedBytes::new(buffer);
361 let mut sstable_reader = VoidSSTable::reader(buffer);
362 assert!(sstable_reader.advance().unwrap());
363 assert_eq!(sstable_reader.key(), &long_key[..]);
364 assert!(sstable_reader.advance().unwrap());
365 assert_eq!(sstable_reader.key(), &[0, 3, 4]);
366 assert!(sstable_reader.advance().unwrap());
367 assert_eq!(sstable_reader.key(), &long_key2[..]);
368 assert!(!sstable_reader.advance().unwrap());
369 }
370
371 #[test]
372 fn test_simple_sstable() {
373 let mut buffer = vec![];
374 {
375 let mut sstable_writer = VoidSSTable::writer(&mut buffer);
376 assert!(sstable_writer.insert([17u8], &()).is_ok());
377 assert!(sstable_writer.insert([17u8, 18u8, 19u8], &()).is_ok());
378 assert!(sstable_writer.insert([17u8, 20u8], &()).is_ok());
379 assert!(sstable_writer.finish().is_ok());
380 }
381 assert_eq!(
382 &buffer,
383 &[
384 8, 0, 0, 0, 0, 16, 17, 33, 18, 19, 17, 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 16, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, ]
395 );
396 let buffer = OwnedBytes::new(buffer);
397 let mut sstable_reader = VoidSSTable::reader(buffer);
398 assert!(sstable_reader.advance().unwrap());
399 assert_eq!(sstable_reader.key(), &[17u8]);
400 assert!(sstable_reader.advance().unwrap());
401 assert_eq!(sstable_reader.key(), &[17u8, 18u8, 19u8]);
402 assert!(sstable_reader.advance().unwrap());
403 assert_eq!(sstable_reader.key(), &[17u8, 20u8]);
404 assert!(!sstable_reader.advance().unwrap());
405 }
406
407 #[test]
408 #[should_panic]
409 fn test_simple_sstable_non_increasing_key() {
410 let mut buffer = vec![];
411 let mut sstable_writer = VoidSSTable::writer(&mut buffer);
412 assert!(sstable_writer.insert([17u8], &()).is_ok());
413 assert!(sstable_writer.insert([16u8], &()).is_ok());
414 }
415
416 #[test]
417 fn test_merge_abcd_abe() {
418 let mut buffer = Vec::new();
419 {
420 let mut writer = VoidSSTable::writer(&mut buffer);
421 writer.insert(b"abcd", &()).unwrap();
422 writer.insert(b"abe", &()).unwrap();
423 writer.finish().unwrap();
424 }
425 let buffer = OwnedBytes::new(buffer);
426 let mut output = Vec::new();
427 assert!(
428 VoidSSTable::merge(vec![buffer.clone(), buffer.clone()], &mut output, VoidMerge)
429 .is_ok()
430 );
431 assert_eq!(&output[..], &buffer[..]);
432 }
433
434 #[test]
435 fn test_sstable() {
436 let mut buffer = Vec::new();
437 {
438 let mut writer = VoidSSTable::writer(&mut buffer);
439 assert_eq!(writer.last_inserted_key(), b"");
440 writer.insert(b"abcd", &()).unwrap();
441 assert_eq!(writer.last_inserted_key(), b"abcd");
442 writer.insert(b"abe", &()).unwrap();
443 assert_eq!(writer.last_inserted_key(), b"abe");
444 writer.finish().unwrap();
445 }
446 let buffer = OwnedBytes::new(buffer);
447 let mut output = Vec::new();
448 assert!(
449 VoidSSTable::merge(vec![buffer.clone(), buffer.clone()], &mut output, VoidMerge)
450 .is_ok()
451 );
452 assert_eq!(&output[..], &buffer[..]);
453 }
454
455 #[test]
456 fn test_sstable_u64() -> io::Result<()> {
457 let mut buffer = Vec::new();
458 let mut writer = MonotonicU64SSTable::writer(&mut buffer);
459 writer.insert(b"abcd", &1u64)?;
460 writer.insert(b"abe", &4u64)?;
461 writer.insert(b"gogo", &4324234234234234u64)?;
462 writer.finish()?;
463 let buffer = OwnedBytes::new(buffer);
464 let mut reader = MonotonicU64SSTable::reader(buffer);
465 assert!(reader.advance()?);
466 assert_eq!(reader.key(), b"abcd");
467 assert_eq!(reader.value(), &1u64);
468 assert!(reader.advance()?);
469 assert_eq!(reader.key(), b"abe");
470 assert_eq!(reader.value(), &4u64);
471 assert!(reader.advance()?);
472 assert_eq!(reader.key(), b"gogo");
473 assert_eq!(reader.value(), &4324234234234234u64);
474 assert!(!reader.advance()?);
475 Ok(())
476 }
477
478 #[test]
479 fn test_sstable_empty() {
480 let mut sstable_range_empty = crate::RangeSSTable::create_empty_reader();
481 assert!(!sstable_range_empty.advance().unwrap());
482 }
483
484 use common::file_slice::FileSlice;
485 use proptest::prelude::*;
486
487 use crate::Dictionary;
488
489 fn bound_strategy() -> impl Strategy<Value = Bound<String>> {
490 prop_oneof![
491 Just(Bound::<String>::Unbounded),
492 "[a-c]{0,5}".prop_map(Bound::Included),
493 "[a-c]{0,5}".prop_map(Bound::Excluded),
494 ]
495 }
496
497 fn extract_key(bound: Bound<&String>) -> Option<&str> {
498 match bound.as_ref() {
499 Bound::Included(key) => Some(key.as_str()),
500 Bound::Excluded(key) => Some(key.as_str()),
501 Bound::Unbounded => None,
502 }
503 }
504
505 fn bounds_strategy() -> impl Strategy<Value = (Bound<String>, Bound<String>)> {
506 (bound_strategy(), bound_strategy()).prop_filter(
507 "Lower bound <= Upper bound",
508 |(left, right)| match (extract_key(left.as_ref()), extract_key(right.as_ref())) {
509 (None, _) => true,
510 (_, None) => true,
511 (left, right) => left < right,
512 },
513 )
514 }
515
516 proptest! {
517 #[test]
518 fn test_proptest_sstable_ranges(words in prop::collection::btree_set("[a-c]{0,6}", 1..100),
519 (lower_bound, upper_bound) in bounds_strategy(),
520 ) {
521 let mut builder = Dictionary::<VoidSSTable>::builder(Vec::new()).unwrap();
522 builder.set_block_len(16);
523 for word in &words {
524 builder.insert(word.as_bytes(), &()).unwrap();
525 }
526 let buffer: Vec<u8> = builder.finish().unwrap();
527 let dictionary: Dictionary<VoidSSTable> = Dictionary::open(FileSlice::from(buffer)).unwrap();
528 let mut range_builder = dictionary.range();
529 range_builder = match lower_bound.as_ref() {
530 Bound::Included(key) => range_builder.ge(key.as_bytes()),
531 Bound::Excluded(key) => range_builder.gt(key.as_bytes()),
532 Bound::Unbounded => range_builder,
533 };
534 range_builder = match upper_bound.as_ref() {
535 Bound::Included(key) => range_builder.le(key.as_bytes()),
536 Bound::Excluded(key) => range_builder.lt(key.as_bytes()),
537 Bound::Unbounded => range_builder,
538 };
539 let mut stream = range_builder.into_stream().unwrap();
540 let mut btree_set_range = words.range((lower_bound, upper_bound));
541 while stream.advance() {
542 let val = btree_set_range.next().unwrap();
543 assert_eq!(val.as_bytes(), stream.key());
544 }
545 assert!(btree_set_range.next().is_none());
546 }
547 }
548}