tantivy_sstable/
lib.rs

1use std::io::{self, Write};
2use std::ops::Range;
3use std::usize;
4
5use merge::ValueMerger;
6
7mod delta;
8mod dictionary;
9pub mod merge;
10mod streamer;
11pub mod value;
12
13mod sstable_index_v3;
14pub use sstable_index_v3::{BlockAddr, SSTableIndex, SSTableIndexBuilder, SSTableIndexV3};
15mod sstable_index_v2;
16pub(crate) mod vint;
17pub use dictionary::Dictionary;
18pub use streamer::{Streamer, StreamerBuilder};
19
20mod block_reader;
21use common::{BinarySerializable, OwnedBytes};
22
23pub use self::block_reader::BlockReader;
24pub use self::delta::{DeltaReader, DeltaWriter};
25pub use self::merge::VoidMerge;
26use self::value::{U64MonotonicValueReader, U64MonotonicValueWriter, ValueReader, ValueWriter};
27use crate::value::{RangeValueReader, RangeValueWriter};
28
29pub type TermOrdinal = u64;
30
31const DEFAULT_KEY_CAPACITY: usize = 50;
32const SSTABLE_VERSION: u32 = 3;
33
34/// Given two byte string returns the length of
35/// the longest common prefix.
36fn common_prefix_len(left: &[u8], right: &[u8]) -> usize {
37    left.iter()
38        .cloned()
39        .zip(right.iter().cloned())
40        .take_while(|(left, right)| left == right)
41        .count()
42}
43
44#[derive(Debug, Copy, Clone)]
45pub struct SSTableDataCorruption;
46
47/// SSTable makes it possible to read and write
48/// sstables with typed values.
49pub trait SSTable: Sized {
50    type Value: Clone;
51    type ValueReader: ValueReader<Value = Self::Value>;
52    type ValueWriter: ValueWriter<Value = Self::Value>;
53
54    fn delta_writer<W: io::Write>(write: W) -> DeltaWriter<W, Self::ValueWriter> {
55        DeltaWriter::new(write)
56    }
57
58    fn writer<W: io::Write>(wrt: W) -> Writer<W, Self::ValueWriter> {
59        Writer::new(wrt)
60    }
61
62    fn delta_reader(reader: OwnedBytes) -> DeltaReader<Self::ValueReader> {
63        DeltaReader::new(reader)
64    }
65
66    fn reader(reader: OwnedBytes) -> Reader<Self::ValueReader> {
67        Reader {
68            key: Vec::with_capacity(DEFAULT_KEY_CAPACITY),
69            delta_reader: Self::delta_reader(reader),
70        }
71    }
72
73    /// Returns an empty static reader.
74    fn create_empty_reader() -> Reader<Self::ValueReader> {
75        Self::reader(OwnedBytes::empty())
76    }
77
78    fn merge<W: io::Write, M: ValueMerger<Self::Value>>(
79        io_readers: Vec<OwnedBytes>,
80        w: W,
81        merger: M,
82    ) -> io::Result<()> {
83        let readers: Vec<_> = io_readers.into_iter().map(Self::reader).collect();
84        let writer = Self::writer(w);
85        merge::merge_sstable::<Self, _, _>(readers, writer, merger)
86    }
87}
88
89#[allow(dead_code)]
90pub struct VoidSSTable;
91
92impl SSTable for VoidSSTable {
93    type Value = ();
94    type ValueReader = value::VoidValueReader;
95    type ValueWriter = value::VoidValueWriter;
96}
97
98/// SSTable associated keys to u64
99/// sorted in order.
100///
101/// In other words, two keys `k1` and `k2`
102/// such that `k1` <= `k2`, are required to observe
103/// `range_sstable[k1] <= range_sstable[k2]`.
104#[allow(dead_code)]
105pub struct MonotonicU64SSTable;
106
107impl SSTable for MonotonicU64SSTable {
108    type Value = u64;
109
110    type ValueReader = U64MonotonicValueReader;
111
112    type ValueWriter = U64MonotonicValueWriter;
113}
114
115/// SSTable associating keys to ranges.
116/// The range are required to partition the
117/// space.
118///
119/// In other words, two consecutive keys `k1` and `k2`
120/// are required to observe
121/// `range_sstable[k1].end == range_sstable[k2].start`.
122///
123/// The first range is not required to start at `0`.
124#[derive(Clone, Copy, Debug)]
125pub struct RangeSSTable;
126
127impl SSTable for RangeSSTable {
128    type Value = Range<u64>;
129
130    type ValueReader = RangeValueReader;
131
132    type ValueWriter = RangeValueWriter;
133}
134
135/// SSTable reader.
136pub struct Reader<TValueReader> {
137    key: Vec<u8>,
138    delta_reader: DeltaReader<TValueReader>,
139}
140
141impl<TValueReader> Reader<TValueReader>
142where TValueReader: ValueReader
143{
144    pub fn advance(&mut self) -> io::Result<bool> {
145        if !self.delta_reader.advance()? {
146            return Ok(false);
147        }
148        let common_prefix_len = self.delta_reader.common_prefix_len();
149        let suffix = self.delta_reader.suffix();
150        let new_len = self.delta_reader.common_prefix_len() + suffix.len();
151        self.key.resize(new_len, 0u8);
152        self.key[common_prefix_len..].copy_from_slice(suffix);
153        Ok(true)
154    }
155
156    #[inline(always)]
157    pub fn key(&self) -> &[u8] {
158        &self.key
159    }
160
161    #[inline(always)]
162    pub fn value(&self) -> &TValueReader::Value {
163        self.delta_reader.value()
164    }
165}
166
167impl<TValueReader> AsRef<[u8]> for Reader<TValueReader> {
168    #[inline(always)]
169    fn as_ref(&self) -> &[u8] {
170        &self.key
171    }
172}
173
174pub struct Writer<W, TValueWriter>
175where W: io::Write
176{
177    previous_key: Vec<u8>,
178    index_builder: SSTableIndexBuilder,
179    delta_writer: DeltaWriter<W, TValueWriter>,
180    num_terms: u64,
181    first_ordinal_of_the_block: u64,
182}
183
184impl<W, TValueWriter> Writer<W, TValueWriter>
185where
186    W: io::Write,
187    TValueWriter: value::ValueWriter,
188{
189    /// Use `Self::new`. This method only exists to match its
190    /// equivalent in fst.
191    /// TODO remove this function. (See Issue #1727)
192    #[doc(hidden)]
193    pub fn create(wrt: W) -> io::Result<Self> {
194        Ok(Self::new(wrt))
195    }
196
197    /// Creates a new `TermDictionaryBuilder`.
198    pub fn new(wrt: W) -> Self {
199        Writer {
200            previous_key: Vec::with_capacity(DEFAULT_KEY_CAPACITY),
201            num_terms: 0u64,
202            index_builder: SSTableIndexBuilder::default(),
203            delta_writer: DeltaWriter::new(wrt),
204            first_ordinal_of_the_block: 0u64,
205        }
206    }
207
208    /// Set the target block length.
209    ///
210    /// The delta part of a block will generally be slightly larger than the requested `block_len`,
211    /// however this does not account for the length of the Value part of the table.
212    pub fn set_block_len(&mut self, block_len: usize) {
213        self.delta_writer.set_block_len(block_len)
214    }
215
216    /// Returns the last inserted key.
217    /// If no key has been inserted yet, or the block was just
218    /// flushed, this function returns "".
219    #[inline(always)]
220    pub(crate) fn last_inserted_key(&self) -> &[u8] {
221        &self.previous_key[..]
222    }
223
224    /// Inserts a `(key, value)` pair in the term dictionary.
225    /// Keys have to be inserted in order.
226    ///
227    /// # Panics
228    ///
229    /// Will panics if keys are inserted in an invalid order.
230    #[inline]
231    pub fn insert<K: AsRef<[u8]>>(
232        &mut self,
233        key: K,
234        value: &TValueWriter::Value,
235    ) -> io::Result<()> {
236        self.insert_key(key.as_ref())?;
237        self.insert_value(value)?;
238        Ok(())
239    }
240
241    /// # Warning
242    ///
243    /// Horribly dangerous internal API. See `.insert(...)`.
244    #[doc(hidden)]
245    #[inline]
246    pub fn insert_key(&mut self, key: &[u8]) -> io::Result<()> {
247        // If this is the first key in the block, we use it to
248        // shorten the last term in the last block.
249        if self.first_ordinal_of_the_block == self.num_terms {
250            self.index_builder
251                .shorten_last_block_key_given_next_key(key);
252        }
253        let keep_len = common_prefix_len(&self.previous_key, key);
254        let add_len = key.len() - keep_len;
255        let increasing_keys = add_len > 0 && (self.previous_key.len() == keep_len)
256            || self.previous_key.is_empty()
257            || self.previous_key[keep_len] < key[keep_len];
258        assert!(
259            increasing_keys,
260            "Keys should be increasing. ({:?} > {key:?})",
261            self.previous_key
262        );
263        self.previous_key.resize(key.len(), 0u8);
264        self.previous_key[keep_len..].copy_from_slice(&key[keep_len..]);
265        self.delta_writer.write_suffix(keep_len, &key[keep_len..]);
266        Ok(())
267    }
268
269    /// # Warning
270    ///
271    /// Horribly dangerous internal API. See `.insert(...)`.
272    #[doc(hidden)]
273    #[inline]
274    pub fn insert_value(&mut self, value: &TValueWriter::Value) -> io::Result<()> {
275        self.delta_writer.write_value(value);
276        self.num_terms += 1u64;
277        self.flush_block_if_required()
278    }
279
280    pub fn flush_block_if_required(&mut self) -> io::Result<()> {
281        if let Some(byte_range) = self.delta_writer.flush_block_if_required()? {
282            self.index_builder.add_block(
283                &self.previous_key[..],
284                byte_range,
285                self.first_ordinal_of_the_block,
286            );
287            self.first_ordinal_of_the_block = self.num_terms;
288            self.previous_key.clear();
289        }
290        Ok(())
291    }
292
293    pub fn finish(mut self) -> io::Result<W> {
294        if let Some(byte_range) = self.delta_writer.flush_block()? {
295            self.index_builder.add_block(
296                &self.previous_key[..],
297                byte_range,
298                self.first_ordinal_of_the_block,
299            );
300            self.first_ordinal_of_the_block = self.num_terms;
301        }
302        let mut wrt = self.delta_writer.finish();
303        // add a final empty block as an end marker
304        wrt.write_all(&0u32.to_le_bytes())?;
305
306        let offset = wrt.written_bytes();
307
308        let fst_len: u64 = self.index_builder.serialize(&mut wrt)?;
309        wrt.write_all(&fst_len.to_le_bytes())?;
310        wrt.write_all(&offset.to_le_bytes())?;
311        wrt.write_all(&self.num_terms.to_le_bytes())?;
312
313        SSTABLE_VERSION.serialize(&mut wrt)?;
314
315        let wrt = wrt.finish();
316        Ok(wrt.into_inner()?)
317    }
318}
319
320#[cfg(test)]
321mod test {
322    use std::io;
323    use std::ops::Bound;
324
325    use common::OwnedBytes;
326
327    use super::{common_prefix_len, MonotonicU64SSTable, SSTable, VoidMerge, VoidSSTable};
328
329    fn aux_test_common_prefix_len(left: &str, right: &str, expect_len: usize) {
330        assert_eq!(
331            common_prefix_len(left.as_bytes(), right.as_bytes()),
332            expect_len
333        );
334        assert_eq!(
335            common_prefix_len(right.as_bytes(), left.as_bytes()),
336            expect_len
337        );
338    }
339
340    #[test]
341    fn test_common_prefix_len() {
342        aux_test_common_prefix_len("a", "ab", 1);
343        aux_test_common_prefix_len("", "ab", 0);
344        aux_test_common_prefix_len("ab", "abc", 2);
345        aux_test_common_prefix_len("abde", "abce", 2);
346    }
347
348    #[test]
349    fn test_long_key_diff() {
350        let long_key = (0..1_024).map(|x| (x % 255) as u8).collect::<Vec<_>>();
351        let long_key2 = (1..300).map(|x| (x % 255) as u8).collect::<Vec<_>>();
352        let mut buffer = vec![];
353        {
354            let mut sstable_writer = VoidSSTable::writer(&mut buffer);
355            assert!(sstable_writer.insert(&long_key[..], &()).is_ok());
356            assert!(sstable_writer.insert([0, 3, 4], &()).is_ok());
357            assert!(sstable_writer.insert(&long_key2[..], &()).is_ok());
358            assert!(sstable_writer.finish().is_ok());
359        }
360        let buffer = OwnedBytes::new(buffer);
361        let mut sstable_reader = VoidSSTable::reader(buffer);
362        assert!(sstable_reader.advance().unwrap());
363        assert_eq!(sstable_reader.key(), &long_key[..]);
364        assert!(sstable_reader.advance().unwrap());
365        assert_eq!(sstable_reader.key(), &[0, 3, 4]);
366        assert!(sstable_reader.advance().unwrap());
367        assert_eq!(sstable_reader.key(), &long_key2[..]);
368        assert!(!sstable_reader.advance().unwrap());
369    }
370
371    #[test]
372    fn test_simple_sstable() {
373        let mut buffer = vec![];
374        {
375            let mut sstable_writer = VoidSSTable::writer(&mut buffer);
376            assert!(sstable_writer.insert([17u8], &()).is_ok());
377            assert!(sstable_writer.insert([17u8, 18u8, 19u8], &()).is_ok());
378            assert!(sstable_writer.insert([17u8, 20u8], &()).is_ok());
379            assert!(sstable_writer.finish().is_ok());
380        }
381        assert_eq!(
382            &buffer,
383            &[
384                // block
385                8, 0, 0, 0, // size of block
386                0, // compression
387                16, 17, 33, 18, 19, 17, 20, // data block
388                0, 0, 0, 0, // no more block
389                // index
390                0, 0, 0, 0, 0, 0, 0, 0, // fst lenght
391                16, 0, 0, 0, 0, 0, 0, 0, // index start offset
392                3, 0, 0, 0, 0, 0, 0, 0, // num term
393                3, 0, 0, 0, // version
394            ]
395        );
396        let buffer = OwnedBytes::new(buffer);
397        let mut sstable_reader = VoidSSTable::reader(buffer);
398        assert!(sstable_reader.advance().unwrap());
399        assert_eq!(sstable_reader.key(), &[17u8]);
400        assert!(sstable_reader.advance().unwrap());
401        assert_eq!(sstable_reader.key(), &[17u8, 18u8, 19u8]);
402        assert!(sstable_reader.advance().unwrap());
403        assert_eq!(sstable_reader.key(), &[17u8, 20u8]);
404        assert!(!sstable_reader.advance().unwrap());
405    }
406
407    #[test]
408    #[should_panic]
409    fn test_simple_sstable_non_increasing_key() {
410        let mut buffer = vec![];
411        let mut sstable_writer = VoidSSTable::writer(&mut buffer);
412        assert!(sstable_writer.insert([17u8], &()).is_ok());
413        assert!(sstable_writer.insert([16u8], &()).is_ok());
414    }
415
416    #[test]
417    fn test_merge_abcd_abe() {
418        let mut buffer = Vec::new();
419        {
420            let mut writer = VoidSSTable::writer(&mut buffer);
421            writer.insert(b"abcd", &()).unwrap();
422            writer.insert(b"abe", &()).unwrap();
423            writer.finish().unwrap();
424        }
425        let buffer = OwnedBytes::new(buffer);
426        let mut output = Vec::new();
427        assert!(
428            VoidSSTable::merge(vec![buffer.clone(), buffer.clone()], &mut output, VoidMerge)
429                .is_ok()
430        );
431        assert_eq!(&output[..], &buffer[..]);
432    }
433
434    #[test]
435    fn test_sstable() {
436        let mut buffer = Vec::new();
437        {
438            let mut writer = VoidSSTable::writer(&mut buffer);
439            assert_eq!(writer.last_inserted_key(), b"");
440            writer.insert(b"abcd", &()).unwrap();
441            assert_eq!(writer.last_inserted_key(), b"abcd");
442            writer.insert(b"abe", &()).unwrap();
443            assert_eq!(writer.last_inserted_key(), b"abe");
444            writer.finish().unwrap();
445        }
446        let buffer = OwnedBytes::new(buffer);
447        let mut output = Vec::new();
448        assert!(
449            VoidSSTable::merge(vec![buffer.clone(), buffer.clone()], &mut output, VoidMerge)
450                .is_ok()
451        );
452        assert_eq!(&output[..], &buffer[..]);
453    }
454
455    #[test]
456    fn test_sstable_u64() -> io::Result<()> {
457        let mut buffer = Vec::new();
458        let mut writer = MonotonicU64SSTable::writer(&mut buffer);
459        writer.insert(b"abcd", &1u64)?;
460        writer.insert(b"abe", &4u64)?;
461        writer.insert(b"gogo", &4324234234234234u64)?;
462        writer.finish()?;
463        let buffer = OwnedBytes::new(buffer);
464        let mut reader = MonotonicU64SSTable::reader(buffer);
465        assert!(reader.advance()?);
466        assert_eq!(reader.key(), b"abcd");
467        assert_eq!(reader.value(), &1u64);
468        assert!(reader.advance()?);
469        assert_eq!(reader.key(), b"abe");
470        assert_eq!(reader.value(), &4u64);
471        assert!(reader.advance()?);
472        assert_eq!(reader.key(), b"gogo");
473        assert_eq!(reader.value(), &4324234234234234u64);
474        assert!(!reader.advance()?);
475        Ok(())
476    }
477
478    #[test]
479    fn test_sstable_empty() {
480        let mut sstable_range_empty = crate::RangeSSTable::create_empty_reader();
481        assert!(!sstable_range_empty.advance().unwrap());
482    }
483
484    use common::file_slice::FileSlice;
485    use proptest::prelude::*;
486
487    use crate::Dictionary;
488
489    fn bound_strategy() -> impl Strategy<Value = Bound<String>> {
490        prop_oneof![
491            Just(Bound::<String>::Unbounded),
492            "[a-c]{0,5}".prop_map(Bound::Included),
493            "[a-c]{0,5}".prop_map(Bound::Excluded),
494        ]
495    }
496
497    fn extract_key(bound: Bound<&String>) -> Option<&str> {
498        match bound.as_ref() {
499            Bound::Included(key) => Some(key.as_str()),
500            Bound::Excluded(key) => Some(key.as_str()),
501            Bound::Unbounded => None,
502        }
503    }
504
505    fn bounds_strategy() -> impl Strategy<Value = (Bound<String>, Bound<String>)> {
506        (bound_strategy(), bound_strategy()).prop_filter(
507            "Lower bound <= Upper bound",
508            |(left, right)| match (extract_key(left.as_ref()), extract_key(right.as_ref())) {
509                (None, _) => true,
510                (_, None) => true,
511                (left, right) => left < right,
512            },
513        )
514    }
515
516    proptest! {
517        #[test]
518        fn test_proptest_sstable_ranges(words in prop::collection::btree_set("[a-c]{0,6}", 1..100),
519            (lower_bound, upper_bound) in bounds_strategy(),
520        ) {
521            let mut builder = Dictionary::<VoidSSTable>::builder(Vec::new()).unwrap();
522            builder.set_block_len(16);
523            for word in &words {
524                builder.insert(word.as_bytes(), &()).unwrap();
525            }
526            let buffer: Vec<u8> = builder.finish().unwrap();
527            let dictionary: Dictionary<VoidSSTable> = Dictionary::open(FileSlice::from(buffer)).unwrap();
528            let mut range_builder = dictionary.range();
529            range_builder = match lower_bound.as_ref() {
530                Bound::Included(key) => range_builder.ge(key.as_bytes()),
531                Bound::Excluded(key) => range_builder.gt(key.as_bytes()),
532                Bound::Unbounded => range_builder,
533            };
534            range_builder = match upper_bound.as_ref() {
535                Bound::Included(key) => range_builder.le(key.as_bytes()),
536                Bound::Excluded(key) => range_builder.lt(key.as_bytes()),
537                Bound::Unbounded => range_builder,
538            };
539            let mut stream = range_builder.into_stream().unwrap();
540            let mut btree_set_range = words.range((lower_bound, upper_bound));
541            while stream.advance() {
542                let val = btree_set_range.next().unwrap();
543                assert_eq!(val.as_bytes(), stream.key());
544            }
545            assert!(btree_set_range.next().is_none());
546        }
547    }
548}