summavy_sstable/
lib.rs

1use std::io::{self, Write};
2use std::ops::Range;
3use std::usize;
4
5use merge::ValueMerger;
6
7mod delta;
8mod dictionary;
9pub mod merge;
10mod streamer;
11pub mod value;
12
13mod sstable_index;
14pub use sstable_index::{BlockAddr, SSTableIndex, SSTableIndexBuilder};
15pub(crate) mod vint;
16pub use dictionary::Dictionary;
17pub use streamer::{Streamer, StreamerBuilder};
18
19mod block_reader;
20pub use self::block_reader::BlockReader;
21pub use self::delta::{DeltaReader, DeltaWriter};
22pub use self::merge::VoidMerge;
23use self::value::{U64MonotonicValueReader, U64MonotonicValueWriter, ValueReader, ValueWriter};
24use crate::value::{RangeValueReader, RangeValueWriter};
25
26pub type TermOrdinal = u64;
27
28const DEFAULT_KEY_CAPACITY: usize = 50;
29
30/// Given two byte string returns the length of
31/// the longest common prefix.
32fn common_prefix_len(left: &[u8], right: &[u8]) -> usize {
33    left.iter()
34        .cloned()
35        .zip(right.iter().cloned())
36        .take_while(|(left, right)| left == right)
37        .count()
38}
39
40#[derive(Debug, Copy, Clone)]
41pub struct SSTableDataCorruption;
42
43/// SSTable makes it possible to read and write
44/// sstables with typed values.
45pub trait SSTable: Sized {
46    type Value: Clone;
47    type ValueReader: ValueReader<Value = Self::Value>;
48    type ValueWriter: ValueWriter<Value = Self::Value>;
49
50    fn delta_writer<W: io::Write>(write: W) -> DeltaWriter<W, Self::ValueWriter> {
51        DeltaWriter::new(write)
52    }
53
54    fn writer<W: io::Write>(wrt: W) -> Writer<W, Self::ValueWriter> {
55        Writer::new(wrt)
56    }
57
58    fn delta_reader<'a, R: io::Read + 'a>(reader: R) -> DeltaReader<'a, Self::ValueReader> {
59        DeltaReader::new(reader)
60    }
61
62    fn reader<'a, R: io::Read + 'a>(reader: R) -> Reader<'a, Self::ValueReader> {
63        Reader {
64            key: Vec::with_capacity(DEFAULT_KEY_CAPACITY),
65            delta_reader: Self::delta_reader(reader),
66        }
67    }
68
69    /// Returns an empty static reader.
70    fn create_empty_reader() -> Reader<'static, Self::ValueReader> {
71        Self::reader(&b""[..])
72    }
73
74    fn merge<R: io::Read, W: io::Write, M: ValueMerger<Self::Value>>(
75        io_readers: Vec<R>,
76        w: W,
77        merger: M,
78    ) -> io::Result<()> {
79        let readers: Vec<_> = io_readers.into_iter().map(Self::reader).collect();
80        let writer = Self::writer(w);
81        merge::merge_sstable::<Self, _, _>(readers, writer, merger)
82    }
83}
84
85#[allow(dead_code)]
86pub struct VoidSSTable;
87
88impl SSTable for VoidSSTable {
89    type Value = ();
90    type ValueReader = value::VoidValueReader;
91    type ValueWriter = value::VoidValueWriter;
92}
93
94/// SSTable associated keys to u64
95/// sorted in order.
96///
97/// In other words, two keys `k1` and `k2`
98/// such that `k1` <= `k2`, are required to observe
99/// `range_sstable[k1] <= range_sstable[k2]`.
100#[allow(dead_code)]
101pub struct MonotonicU64SSTable;
102
103impl SSTable for MonotonicU64SSTable {
104    type Value = u64;
105
106    type ValueReader = U64MonotonicValueReader;
107
108    type ValueWriter = U64MonotonicValueWriter;
109}
110
111/// SSTable associating keys to ranges.
112/// The range are required to partition the
113/// space.
114///
115/// In other words, two consecutive keys `k1` and `k2`
116/// are required to observe
117/// `range_sstable[k1].end == range_sstable[k2].start`.
118///
119/// The first range is not required to start at `0`.
120pub struct RangeSSTable;
121
122impl SSTable for RangeSSTable {
123    type Value = Range<u64>;
124
125    type ValueReader = RangeValueReader;
126
127    type ValueWriter = RangeValueWriter;
128}
129
130/// SSTable reader.
131pub struct Reader<'a, TValueReader> {
132    key: Vec<u8>,
133    delta_reader: DeltaReader<'a, TValueReader>,
134}
135
136impl<'a, TValueReader> Reader<'a, TValueReader>
137where TValueReader: ValueReader
138{
139    pub fn advance(&mut self) -> io::Result<bool> {
140        if !self.delta_reader.advance()? {
141            return Ok(false);
142        }
143        let common_prefix_len = self.delta_reader.common_prefix_len();
144        let suffix = self.delta_reader.suffix();
145        let new_len = self.delta_reader.common_prefix_len() + suffix.len();
146        self.key.resize(new_len, 0u8);
147        self.key[common_prefix_len..].copy_from_slice(suffix);
148        Ok(true)
149    }
150
151    #[inline(always)]
152    pub fn key(&self) -> &[u8] {
153        &self.key
154    }
155
156    #[inline(always)]
157    pub fn value(&self) -> &TValueReader::Value {
158        self.delta_reader.value()
159    }
160}
161
162impl<'a, TValueReader> AsRef<[u8]> for Reader<'a, TValueReader> {
163    #[inline(always)]
164    fn as_ref(&self) -> &[u8] {
165        &self.key
166    }
167}
168
169pub struct Writer<W, TValueWriter>
170where W: io::Write
171{
172    previous_key: Vec<u8>,
173    index_builder: SSTableIndexBuilder,
174    delta_writer: DeltaWriter<W, TValueWriter>,
175    num_terms: u64,
176    first_ordinal_of_the_block: u64,
177}
178
179impl<W, TValueWriter> Writer<W, TValueWriter>
180where
181    W: io::Write,
182    TValueWriter: value::ValueWriter,
183{
184    /// Use `Self::new`. This method only exists to match its
185    /// equivalent in fst.
186    /// TODO remove this function. (See Issue #1727)
187    #[doc(hidden)]
188    pub fn create(wrt: W) -> io::Result<Self> {
189        Ok(Self::new(wrt))
190    }
191
192    /// Creates a new `TermDictionaryBuilder`.
193    pub fn new(wrt: W) -> Self {
194        Writer {
195            previous_key: Vec::with_capacity(DEFAULT_KEY_CAPACITY),
196            num_terms: 0u64,
197            index_builder: SSTableIndexBuilder::default(),
198            delta_writer: DeltaWriter::new(wrt),
199            first_ordinal_of_the_block: 0u64,
200        }
201    }
202
203    /// Returns the last inserted key.
204    /// If no key has been inserted yet, or the block was just
205    /// flushed, this function returns "".
206    #[inline(always)]
207    pub(crate) fn last_inserted_key(&self) -> &[u8] {
208        &self.previous_key[..]
209    }
210
211    /// Inserts a `(key, value)` pair in the term dictionary.
212    /// Keys have to be inserted in order.
213    ///
214    /// # Panics
215    ///
216    /// Will panics if keys are inserted in an invalid order.
217    #[inline]
218    pub fn insert<K: AsRef<[u8]>>(
219        &mut self,
220        key: K,
221        value: &TValueWriter::Value,
222    ) -> io::Result<()> {
223        self.insert_key(key.as_ref())?;
224        self.insert_value(value)?;
225        Ok(())
226    }
227
228    /// # Warning
229    ///
230    /// Horribly dangerous internal API. See `.insert(...)`.
231    #[doc(hidden)]
232    #[inline]
233    pub fn insert_key(&mut self, key: &[u8]) -> io::Result<()> {
234        // If this is the first key in the block, we use it to
235        // shorten the last term in the last block.
236        if self.first_ordinal_of_the_block == self.num_terms {
237            self.index_builder
238                .shorten_last_block_key_given_next_key(key);
239        }
240        let keep_len = common_prefix_len(&self.previous_key, key);
241        let add_len = key.len() - keep_len;
242        let increasing_keys = add_len > 0 && (self.previous_key.len() == keep_len)
243            || self.previous_key.is_empty()
244            || self.previous_key[keep_len] < key[keep_len];
245        assert!(
246            increasing_keys,
247            "Keys should be increasing. ({:?} > {:?})",
248            self.previous_key, key
249        );
250        self.previous_key.resize(key.len(), 0u8);
251        self.previous_key[keep_len..].copy_from_slice(&key[keep_len..]);
252        self.delta_writer.write_suffix(keep_len, &key[keep_len..]);
253        Ok(())
254    }
255
256    /// # Warning
257    ///
258    /// Horribly dangerous internal API. See `.insert(...)`.
259    #[doc(hidden)]
260    #[inline]
261    pub fn insert_value(&mut self, value: &TValueWriter::Value) -> io::Result<()> {
262        self.delta_writer.write_value(value);
263        self.num_terms += 1u64;
264        self.flush_block_if_required()
265    }
266
267    pub fn flush_block_if_required(&mut self) -> io::Result<()> {
268        if let Some(byte_range) = self.delta_writer.flush_block_if_required()? {
269            self.index_builder.add_block(
270                &self.previous_key[..],
271                byte_range,
272                self.first_ordinal_of_the_block,
273            );
274            self.first_ordinal_of_the_block = self.num_terms;
275            self.previous_key.clear();
276        }
277        Ok(())
278    }
279
280    pub fn finish(mut self) -> io::Result<W> {
281        if let Some(byte_range) = self.delta_writer.flush_block()? {
282            self.index_builder.add_block(
283                &self.previous_key[..],
284                byte_range,
285                self.first_ordinal_of_the_block,
286            );
287            self.first_ordinal_of_the_block = self.num_terms;
288        }
289        let mut wrt = self.delta_writer.finish();
290        wrt.write_all(&0u32.to_le_bytes())?;
291
292        let offset = wrt.written_bytes();
293
294        self.index_builder.serialize(&mut wrt)?;
295        wrt.write_all(&offset.to_le_bytes())?;
296        wrt.write_all(&self.num_terms.to_le_bytes())?;
297        let wrt = wrt.finish();
298        Ok(wrt.into_inner()?)
299    }
300}
301
302impl<TValueWriter> Writer<Vec<u8>, TValueWriter>
303where TValueWriter: value::ValueWriter
304{
305    #[inline]
306    pub fn insert_cannot_fail<K: AsRef<[u8]>>(&mut self, key: K, value: &TValueWriter::Value) {
307        self.insert(key, value)
308            .expect("SSTable over a Vec should never fail");
309    }
310}
311
312#[cfg(test)]
313mod test {
314    use std::io;
315    use std::ops::Bound;
316
317    use super::{common_prefix_len, MonotonicU64SSTable, SSTable, VoidMerge, VoidSSTable};
318
319    fn aux_test_common_prefix_len(left: &str, right: &str, expect_len: usize) {
320        assert_eq!(
321            common_prefix_len(left.as_bytes(), right.as_bytes()),
322            expect_len
323        );
324        assert_eq!(
325            common_prefix_len(right.as_bytes(), left.as_bytes()),
326            expect_len
327        );
328    }
329
330    #[test]
331    fn test_common_prefix_len() {
332        aux_test_common_prefix_len("a", "ab", 1);
333        aux_test_common_prefix_len("", "ab", 0);
334        aux_test_common_prefix_len("ab", "abc", 2);
335        aux_test_common_prefix_len("abde", "abce", 2);
336    }
337
338    #[test]
339    fn test_long_key_diff() {
340        let long_key = (0..1_024).map(|x| (x % 255) as u8).collect::<Vec<_>>();
341        let long_key2 = (1..300).map(|x| (x % 255) as u8).collect::<Vec<_>>();
342        let mut buffer = vec![];
343        {
344            let mut sstable_writer = VoidSSTable::writer(&mut buffer);
345            assert!(sstable_writer.insert(&long_key[..], &()).is_ok());
346            assert!(sstable_writer.insert(&[0, 3, 4], &()).is_ok());
347            assert!(sstable_writer.insert(&long_key2[..], &()).is_ok());
348            assert!(sstable_writer.finish().is_ok());
349        }
350        let mut sstable_reader = VoidSSTable::reader(&buffer[..]);
351        assert!(sstable_reader.advance().unwrap());
352        assert_eq!(sstable_reader.key(), &long_key[..]);
353        assert!(sstable_reader.advance().unwrap());
354        assert_eq!(sstable_reader.key(), &[0, 3, 4]);
355        assert!(sstable_reader.advance().unwrap());
356        assert_eq!(sstable_reader.key(), &long_key2[..]);
357        assert!(!sstable_reader.advance().unwrap());
358    }
359
360    #[test]
361    fn test_simple_sstable() {
362        let mut buffer = vec![];
363        {
364            let mut sstable_writer = VoidSSTable::writer(&mut buffer);
365            assert!(sstable_writer.insert(&[17u8], &()).is_ok());
366            assert!(sstable_writer.insert(&[17u8, 18u8, 19u8], &()).is_ok());
367            assert!(sstable_writer.insert(&[17u8, 20u8], &()).is_ok());
368            assert!(sstable_writer.finish().is_ok());
369        }
370        assert_eq!(
371            &buffer,
372            &[
373                // block len
374                7u8, 0u8, 0u8, 0u8, // keep 0 push 1 |  ""
375                16u8, 17u8, // keep 1 push 2 | 18 19
376                33u8, 18u8, 19u8, // keep 1 push 1 | 20
377                17u8, 20u8, 0u8, 0u8, 0u8, 0u8, // no more blocks
378                // index
379                161, 102, 98, 108, 111, 99, 107, 115, 129, 162, 115, 108, 97, 115, 116, 95, 107,
380                101, 121, 95, 111, 114, 95, 103, 114, 101, 97, 116, 101, 114, 130, 17, 20, 106, 98,
381                108, 111, 99, 107, 95, 97, 100, 100, 114, 162, 106, 98, 121, 116, 101, 95, 114, 97,
382                110, 103, 101, 162, 101, 115, 116, 97, 114, 116, 0, 99, 101, 110, 100, 11, 109,
383                102, 105, 114, 115, 116, 95, 111, 114, 100, 105, 110, 97, 108, 0, 15, 0, 0, 0, 0,
384                0, 0, 0, // offset for the index
385                3u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8 // num terms
386            ]
387        );
388        let mut sstable_reader = VoidSSTable::reader(&buffer[..]);
389        assert!(sstable_reader.advance().unwrap());
390        assert_eq!(sstable_reader.key(), &[17u8]);
391        assert!(sstable_reader.advance().unwrap());
392        assert_eq!(sstable_reader.key(), &[17u8, 18u8, 19u8]);
393        assert!(sstable_reader.advance().unwrap());
394        assert_eq!(sstable_reader.key(), &[17u8, 20u8]);
395        assert!(!sstable_reader.advance().unwrap());
396    }
397
398    #[test]
399    #[should_panic]
400    fn test_simple_sstable_non_increasing_key() {
401        let mut buffer = vec![];
402        let mut sstable_writer = VoidSSTable::writer(&mut buffer);
403        assert!(sstable_writer.insert(&[17u8], &()).is_ok());
404        assert!(sstable_writer.insert(&[16u8], &()).is_ok());
405    }
406
407    #[test]
408    fn test_merge_abcd_abe() {
409        let mut buffer = Vec::new();
410        {
411            let mut writer = VoidSSTable::writer(&mut buffer);
412            writer.insert(b"abcd", &()).unwrap();
413            writer.insert(b"abe", &()).unwrap();
414            writer.finish().unwrap();
415        }
416        let mut output = Vec::new();
417        assert!(VoidSSTable::merge(vec![&buffer[..], &buffer[..]], &mut output, VoidMerge).is_ok());
418        assert_eq!(&output[..], &buffer[..]);
419    }
420
421    #[test]
422    fn test_sstable() {
423        let mut buffer = Vec::new();
424        {
425            let mut writer = VoidSSTable::writer(&mut buffer);
426            assert_eq!(writer.last_inserted_key(), b"");
427            writer.insert(b"abcd", &()).unwrap();
428            assert_eq!(writer.last_inserted_key(), b"abcd");
429            writer.insert(b"abe", &()).unwrap();
430            assert_eq!(writer.last_inserted_key(), b"abe");
431            writer.finish().unwrap();
432        }
433        let mut output = Vec::new();
434        assert!(VoidSSTable::merge(vec![&buffer[..], &buffer[..]], &mut output, VoidMerge).is_ok());
435        assert_eq!(&output[..], &buffer[..]);
436    }
437
438    #[test]
439    fn test_sstable_u64() -> io::Result<()> {
440        let mut buffer = Vec::new();
441        let mut writer = MonotonicU64SSTable::writer(&mut buffer);
442        writer.insert(b"abcd", &1u64)?;
443        writer.insert(b"abe", &4u64)?;
444        writer.insert(b"gogo", &4324234234234234u64)?;
445        writer.finish()?;
446        let mut reader = MonotonicU64SSTable::reader(&buffer[..]);
447        assert!(reader.advance()?);
448        assert_eq!(reader.key(), b"abcd");
449        assert_eq!(reader.value(), &1u64);
450        assert!(reader.advance()?);
451        assert_eq!(reader.key(), b"abe");
452        assert_eq!(reader.value(), &4u64);
453        assert!(reader.advance()?);
454        assert_eq!(reader.key(), b"gogo");
455        assert_eq!(reader.value(), &4324234234234234u64);
456        assert!(!reader.advance()?);
457        Ok(())
458    }
459
460    #[test]
461    fn test_sstable_empty() {
462        let mut sstable_range_empty = crate::RangeSSTable::create_empty_reader();
463        assert!(!sstable_range_empty.advance().unwrap());
464    }
465
466    use common::file_slice::FileSlice;
467    use proptest::prelude::*;
468
469    use crate::Dictionary;
470
471    fn bound_strategy() -> impl Strategy<Value = Bound<String>> {
472        prop_oneof![
473            Just(Bound::<String>::Unbounded),
474            "[a-c]{0,5}".prop_map(|key| Bound::Included(key)),
475            "[a-c]{0,5}".prop_map(|key| Bound::Excluded(key)),
476        ]
477    }
478
479    fn extract_key(bound: Bound<&String>) -> Option<&str> {
480        match bound.as_ref() {
481            Bound::Included(key) => Some(key.as_str()),
482            Bound::Excluded(key) => Some(key.as_str()),
483            Bound::Unbounded => None,
484        }
485    }
486
487    fn bounds_strategy() -> impl Strategy<Value = (Bound<String>, Bound<String>)> {
488        (bound_strategy(), bound_strategy()).prop_filter(
489            "Lower bound <= Upper bound",
490            |(left, right)| match (extract_key(left.as_ref()), extract_key(right.as_ref())) {
491                (None, _) => true,
492                (_, None) => true,
493                (left, right) => left < right,
494            },
495        )
496    }
497
498    proptest! {
499        #[test]
500        fn test_proptest_sstable_ranges(words in prop::collection::btree_set("[a-c]{0,6}", 1..100),
501            (lower_bound, upper_bound) in bounds_strategy(),
502        ) {
503            // TODO tweak block size.
504            let mut builder = Dictionary::<VoidSSTable>::builder(Vec::new()).unwrap();
505            for word in &words {
506                builder.insert(word.as_bytes(), &()).unwrap();
507            }
508            let buffer: Vec<u8> = builder.finish().unwrap();
509            let dictionary: Dictionary<VoidSSTable> = Dictionary::open(FileSlice::from(buffer)).unwrap();
510            let mut range_builder = dictionary.range();
511            range_builder = match lower_bound.as_ref() {
512                Bound::Included(key) => range_builder.ge(key.as_bytes()),
513                Bound::Excluded(key) => range_builder.gt(key.as_bytes()),
514                Bound::Unbounded => range_builder,
515            };
516            range_builder = match upper_bound.as_ref() {
517                Bound::Included(key) => range_builder.le(key.as_bytes()),
518                Bound::Excluded(key) => range_builder.lt(key.as_bytes()),
519                Bound::Unbounded => range_builder,
520            };
521            let mut stream = range_builder.into_stream().unwrap();
522            let mut btree_set_range = words.range((lower_bound, upper_bound));
523            while stream.advance() {
524                let val = btree_set_range.next().unwrap();
525                assert_eq!(val.as_bytes(), stream.key());
526            }
527            assert!(btree_set_range.next().is_none());
528        }
529    }
530}