tantivy_sstable/
lib.rs

1//! `tantivy_sstable` is a crate that provides a sorted string table data structure.
2//!
3//! It is used in `tantivy` to store the term dictionary.
4//!
5//! A `sstable` is a map of sorted `&[u8]` keys to values.
6//! The keys are encoded using incremental encoding.
7//!
8//! Values and keys are compressed using zstd with the default feature flag `zstd-compression`.
9//!
10//! # Example
11//!
12//! Here is an example of how to create and search an `sstable`:
13//!
14//! ```rust
15//! use common::OwnedBytes;
16//! use tantivy_sstable::{Dictionary, MonotonicU64SSTable};
17//!
18//! // Create a new sstable in memory.
19//! let mut builder = Dictionary::<MonotonicU64SSTable>::builder(Vec::new()).unwrap();
20//! builder.insert(b"apple", &1).unwrap();
21//! builder.insert(b"banana", &2).unwrap();
22//! builder.insert(b"orange", &3).unwrap();
23//! let sstable_bytes = builder.finish().unwrap();
24//!
25//! // Open the sstable.
26//! let sstable =
27//!     Dictionary::<MonotonicU64SSTable>::from_bytes(OwnedBytes::new(sstable_bytes)).unwrap();
28//!
29//! // Search for a key.
30//! let value = sstable.get(b"banana").unwrap();
31//! assert_eq!(value, Some(2));
32//!
33//! // Search for a non-existent key.
34//! let value = sstable.get(b"grape").unwrap();
35//! assert_eq!(value, None);
36//! ```
37
38use std::io::{self, Write};
39use std::ops::Range;
40
41use merge::ValueMerger;
42
43mod block_match_automaton;
44mod delta;
45mod dictionary;
46pub mod merge;
47mod streamer;
48pub mod value;
49
50mod sstable_index_v3;
51pub use sstable_index_v3::{BlockAddr, SSTableIndex, SSTableIndexBuilder, SSTableIndexV3};
52mod sstable_index_v2;
53pub(crate) mod vint;
54pub use dictionary::Dictionary;
55pub use streamer::{Streamer, StreamerBuilder};
56
57mod block_reader;
58use common::{BinarySerializable, OwnedBytes};
59use value::{VecU32ValueReader, VecU32ValueWriter};
60
61pub use self::block_reader::BlockReader;
62pub use self::delta::{DeltaReader, DeltaWriter};
63pub use self::merge::VoidMerge;
64use self::value::{U64MonotonicValueReader, U64MonotonicValueWriter, ValueReader, ValueWriter};
65use crate::value::{RangeValueReader, RangeValueWriter};
66
67pub type TermOrdinal = u64;
68
69const DEFAULT_KEY_CAPACITY: usize = 50;
70const SSTABLE_VERSION: u32 = 3;
71
72/// Given two byte string returns the length of
73/// the longest common prefix.
74fn common_prefix_len(left: &[u8], right: &[u8]) -> usize {
75    left.iter()
76        .cloned()
77        .zip(right.iter().cloned())
78        .take_while(|(left, right)| left == right)
79        .count()
80}
81
82#[derive(Debug, Copy, Clone)]
83pub struct SSTableDataCorruption;
84
85/// SSTable makes it possible to read and write
86/// sstables with typed values.
87pub trait SSTable: Sized {
88    type Value: Clone;
89    type ValueReader: ValueReader<Value = Self::Value>;
90    type ValueWriter: ValueWriter<Value = Self::Value>;
91
92    fn delta_writer<W: io::Write>(write: W) -> DeltaWriter<W, Self::ValueWriter> {
93        DeltaWriter::new(write)
94    }
95
96    fn writer<W: io::Write>(wrt: W) -> Writer<W, Self::ValueWriter> {
97        Writer::new(wrt)
98    }
99
100    fn delta_reader(reader: OwnedBytes) -> DeltaReader<Self::ValueReader> {
101        DeltaReader::new(reader)
102    }
103
104    fn reader(reader: OwnedBytes) -> Reader<Self::ValueReader> {
105        Reader {
106            key: Vec::with_capacity(DEFAULT_KEY_CAPACITY),
107            delta_reader: Self::delta_reader(reader),
108        }
109    }
110
111    /// Returns an empty static reader.
112    fn create_empty_reader() -> Reader<Self::ValueReader> {
113        Self::reader(OwnedBytes::empty())
114    }
115
116    fn merge<W: io::Write, M: ValueMerger<Self::Value>>(
117        io_readers: Vec<OwnedBytes>,
118        w: W,
119        merger: M,
120    ) -> io::Result<()> {
121        let readers: Vec<_> = io_readers.into_iter().map(Self::reader).collect();
122        let writer = Self::writer(w);
123        merge::merge_sstable::<Self, _, _>(readers, writer, merger)
124    }
125}
126
127pub struct VoidSSTable;
128
129impl SSTable for VoidSSTable {
130    type Value = ();
131    type ValueReader = value::VoidValueReader;
132    type ValueWriter = value::VoidValueWriter;
133}
134
135/// SSTable associated keys to u64
136/// sorted in order.
137///
138/// In other words, two keys `k1` and `k2`
139/// such that `k1` <= `k2`, are required to observe
140/// `range_sstable[k1] <= range_sstable[k2]`.
141pub struct MonotonicU64SSTable;
142
143impl SSTable for MonotonicU64SSTable {
144    type Value = u64;
145
146    type ValueReader = U64MonotonicValueReader;
147
148    type ValueWriter = U64MonotonicValueWriter;
149}
150
151/// SSTable associating keys to ranges.
152/// The range are required to partition the
153/// space.
154///
155/// In other words, two consecutive keys `k1` and `k2`
156/// are required to observe
157/// `range_sstable[k1].end == range_sstable[k2].start`.
158///
159/// The first range is not required to start at `0`.
160#[derive(Clone, Copy, Debug)]
161pub struct RangeSSTable;
162
163impl SSTable for RangeSSTable {
164    type Value = Range<u64>;
165
166    type ValueReader = RangeValueReader;
167
168    type ValueWriter = RangeValueWriter;
169}
170
171/// SSTable associating keys to Vec<u32>.
172pub struct VecU32ValueSSTable;
173
174impl SSTable for VecU32ValueSSTable {
175    type Value = Vec<u32>;
176    type ValueReader = VecU32ValueReader;
177    type ValueWriter = VecU32ValueWriter;
178}
179
180/// SSTable reader.
181pub struct Reader<TValueReader> {
182    key: Vec<u8>,
183    delta_reader: DeltaReader<TValueReader>,
184}
185
186impl<TValueReader> Reader<TValueReader>
187where TValueReader: ValueReader
188{
189    pub fn advance(&mut self) -> io::Result<bool> {
190        if !self.delta_reader.advance()? {
191            return Ok(false);
192        }
193        let common_prefix_len = self.delta_reader.common_prefix_len();
194        let suffix = self.delta_reader.suffix();
195        let new_len = self.delta_reader.common_prefix_len() + suffix.len();
196        self.key.resize(new_len, 0u8);
197        self.key[common_prefix_len..].copy_from_slice(suffix);
198        Ok(true)
199    }
200
201    #[inline(always)]
202    pub fn key(&self) -> &[u8] {
203        &self.key
204    }
205
206    #[inline(always)]
207    pub fn value(&self) -> &TValueReader::Value {
208        self.delta_reader.value()
209    }
210}
211
212impl<TValueReader> AsRef<[u8]> for Reader<TValueReader> {
213    #[inline(always)]
214    fn as_ref(&self) -> &[u8] {
215        &self.key
216    }
217}
218
219pub struct Writer<W, TValueWriter>
220where W: io::Write
221{
222    previous_key: Vec<u8>,
223    index_builder: SSTableIndexBuilder,
224    delta_writer: DeltaWriter<W, TValueWriter>,
225    num_terms: u64,
226    first_ordinal_of_the_block: u64,
227}
228
229impl<W, TValueWriter> Writer<W, TValueWriter>
230where
231    W: io::Write,
232    TValueWriter: value::ValueWriter,
233{
234    /// Use `Self::new`. This method only exists to match its
235    /// equivalent in fst.
236    /// TODO remove this function. (See Issue #1727)
237    #[doc(hidden)]
238    pub fn create(wrt: W) -> io::Result<Self> {
239        Ok(Self::new(wrt))
240    }
241
242    /// Creates a new `TermDictionaryBuilder`.
243    pub fn new(wrt: W) -> Self {
244        Writer {
245            previous_key: Vec::with_capacity(DEFAULT_KEY_CAPACITY),
246            num_terms: 0u64,
247            index_builder: SSTableIndexBuilder::default(),
248            delta_writer: DeltaWriter::new(wrt),
249            first_ordinal_of_the_block: 0u64,
250        }
251    }
252
253    /// Set the target block length.
254    ///
255    /// The delta part of a block will generally be slightly larger than the requested `block_len`,
256    /// however this does not account for the length of the Value part of the table.
257    pub fn set_block_len(&mut self, block_len: usize) {
258        self.delta_writer.set_block_len(block_len)
259    }
260
261    /// Returns the last inserted key.
262    /// If no key has been inserted yet, or the block was just
263    /// flushed, this function returns "".
264    #[inline(always)]
265    pub(crate) fn last_inserted_key(&self) -> &[u8] {
266        &self.previous_key[..]
267    }
268
269    /// Inserts a `(key, value)` pair in the term dictionary.
270    /// Keys have to be inserted in order.
271    ///
272    /// # Panics
273    ///
274    /// Will panics if keys are inserted in an invalid order.
275    #[inline]
276    pub fn insert<K: AsRef<[u8]>>(
277        &mut self,
278        key: K,
279        value: &TValueWriter::Value,
280    ) -> io::Result<()> {
281        self.insert_key(key.as_ref())?;
282        self.insert_value(value)?;
283        Ok(())
284    }
285
286    /// # Warning
287    ///
288    /// Horribly dangerous internal API. See `.insert(...)`.
289    #[doc(hidden)]
290    #[inline]
291    pub fn insert_key(&mut self, key: &[u8]) -> io::Result<()> {
292        // If this is the first key in the block, we use it to
293        // shorten the last term in the last block.
294        if self.first_ordinal_of_the_block == self.num_terms {
295            self.index_builder
296                .shorten_last_block_key_given_next_key(key);
297        }
298        let keep_len = common_prefix_len(&self.previous_key, key);
299        let add_len = key.len() - keep_len;
300        let increasing_keys = add_len > 0 && (self.previous_key.len() == keep_len)
301            || self.previous_key.is_empty()
302            || self.previous_key[keep_len] < key[keep_len];
303        assert!(
304            increasing_keys,
305            "Keys should be increasing. ({:?} > {key:?})",
306            self.previous_key
307        );
308        self.previous_key.resize(key.len(), 0u8);
309        self.previous_key[keep_len..].copy_from_slice(&key[keep_len..]);
310        self.delta_writer.write_suffix(keep_len, &key[keep_len..]);
311        Ok(())
312    }
313
314    /// # Warning
315    ///
316    /// Horribly dangerous internal API. See `.insert(...)`.
317    #[doc(hidden)]
318    #[inline]
319    pub fn insert_value(&mut self, value: &TValueWriter::Value) -> io::Result<()> {
320        self.delta_writer.write_value(value);
321        self.num_terms += 1u64;
322        self.flush_block_if_required()
323    }
324
325    pub fn flush_block_if_required(&mut self) -> io::Result<()> {
326        if let Some(byte_range) = self.delta_writer.flush_block_if_required()? {
327            self.index_builder.add_block(
328                &self.previous_key[..],
329                byte_range,
330                self.first_ordinal_of_the_block,
331            );
332            self.first_ordinal_of_the_block = self.num_terms;
333            self.previous_key.clear();
334        }
335        Ok(())
336    }
337
338    pub fn finish(mut self) -> io::Result<W> {
339        if let Some(byte_range) = self.delta_writer.flush_block()? {
340            self.index_builder.add_block(
341                &self.previous_key[..],
342                byte_range,
343                self.first_ordinal_of_the_block,
344            );
345            self.first_ordinal_of_the_block = self.num_terms;
346        }
347        let mut wrt = self.delta_writer.finish();
348        // add a final empty block as an end marker
349        wrt.write_all(&0u32.to_le_bytes())?;
350
351        let offset = wrt.written_bytes();
352
353        let fst_len: u64 = self.index_builder.serialize(&mut wrt)?;
354        wrt.write_all(&fst_len.to_le_bytes())?;
355        wrt.write_all(&offset.to_le_bytes())?;
356        wrt.write_all(&self.num_terms.to_le_bytes())?;
357
358        SSTABLE_VERSION.serialize(&mut wrt)?;
359
360        let wrt = wrt.finish();
361        Ok(wrt.into_inner()?)
362    }
363}
364
365#[cfg(test)]
366mod test {
367    use std::io;
368    use std::ops::Bound;
369
370    use common::OwnedBytes;
371
372    use super::{MonotonicU64SSTable, SSTable, VoidMerge, VoidSSTable, common_prefix_len};
373
374    fn aux_test_common_prefix_len(left: &str, right: &str, expect_len: usize) {
375        assert_eq!(
376            common_prefix_len(left.as_bytes(), right.as_bytes()),
377            expect_len
378        );
379        assert_eq!(
380            common_prefix_len(right.as_bytes(), left.as_bytes()),
381            expect_len
382        );
383    }
384
385    #[test]
386    fn test_common_prefix_len() {
387        aux_test_common_prefix_len("a", "ab", 1);
388        aux_test_common_prefix_len("", "ab", 0);
389        aux_test_common_prefix_len("ab", "abc", 2);
390        aux_test_common_prefix_len("abde", "abce", 2);
391    }
392
393    #[test]
394    fn test_long_key_diff() {
395        let long_key = (0..1_024).map(|x| (x % 255) as u8).collect::<Vec<_>>();
396        let long_key2 = (1..300).map(|x| (x % 255) as u8).collect::<Vec<_>>();
397        let mut buffer = vec![];
398        {
399            let mut sstable_writer = VoidSSTable::writer(&mut buffer);
400            assert!(sstable_writer.insert(&long_key[..], &()).is_ok());
401            assert!(sstable_writer.insert([0, 3, 4], &()).is_ok());
402            assert!(sstable_writer.insert(&long_key2[..], &()).is_ok());
403            assert!(sstable_writer.finish().is_ok());
404        }
405        let buffer = OwnedBytes::new(buffer);
406        let mut sstable_reader = VoidSSTable::reader(buffer);
407        assert!(sstable_reader.advance().unwrap());
408        assert_eq!(sstable_reader.key(), &long_key[..]);
409        assert!(sstable_reader.advance().unwrap());
410        assert_eq!(sstable_reader.key(), &[0, 3, 4]);
411        assert!(sstable_reader.advance().unwrap());
412        assert_eq!(sstable_reader.key(), &long_key2[..]);
413        assert!(!sstable_reader.advance().unwrap());
414    }
415
416    #[test]
417    fn test_simple_sstable() {
418        let mut buffer = vec![];
419        {
420            let mut sstable_writer = VoidSSTable::writer(&mut buffer);
421            assert!(sstable_writer.insert([17u8], &()).is_ok());
422            assert!(sstable_writer.insert([17u8, 18u8, 19u8], &()).is_ok());
423            assert!(sstable_writer.insert([17u8, 20u8], &()).is_ok());
424            assert!(sstable_writer.finish().is_ok());
425        }
426        assert_eq!(
427            &buffer,
428            &[
429                // block
430                8, 0, 0, 0, // size of block
431                0, // compression
432                16, 17, 33, 18, 19, 17, 20, // data block
433                0, 0, 0, 0, // no more block
434                // index
435                0, 0, 0, 0, 0, 0, 0, 0, // fst length
436                16, 0, 0, 0, 0, 0, 0, 0, // index start offset
437                3, 0, 0, 0, 0, 0, 0, 0, // num term
438                3, 0, 0, 0, // version
439            ]
440        );
441        let buffer = OwnedBytes::new(buffer);
442        let mut sstable_reader = VoidSSTable::reader(buffer);
443        assert!(sstable_reader.advance().unwrap());
444        assert_eq!(sstable_reader.key(), &[17u8]);
445        assert!(sstable_reader.advance().unwrap());
446        assert_eq!(sstable_reader.key(), &[17u8, 18u8, 19u8]);
447        assert!(sstable_reader.advance().unwrap());
448        assert_eq!(sstable_reader.key(), &[17u8, 20u8]);
449        assert!(!sstable_reader.advance().unwrap());
450    }
451
452    #[test]
453    #[should_panic]
454    fn test_simple_sstable_non_increasing_key() {
455        let mut buffer = vec![];
456        let mut sstable_writer = VoidSSTable::writer(&mut buffer);
457        assert!(sstable_writer.insert([17u8], &()).is_ok());
458        assert!(sstable_writer.insert([16u8], &()).is_ok());
459    }
460
461    #[test]
462    fn test_merge_abcd_abe() {
463        let mut buffer = Vec::new();
464        {
465            let mut writer = VoidSSTable::writer(&mut buffer);
466            writer.insert(b"abcd", &()).unwrap();
467            writer.insert(b"abe", &()).unwrap();
468            writer.finish().unwrap();
469        }
470        let buffer = OwnedBytes::new(buffer);
471        let mut output = Vec::new();
472        assert!(
473            VoidSSTable::merge(vec![buffer.clone(), buffer.clone()], &mut output, VoidMerge)
474                .is_ok()
475        );
476        assert_eq!(&output[..], &buffer[..]);
477    }
478
479    #[test]
480    fn test_sstable() {
481        let mut buffer = Vec::new();
482        {
483            let mut writer = VoidSSTable::writer(&mut buffer);
484            assert_eq!(writer.last_inserted_key(), b"");
485            writer.insert(b"abcd", &()).unwrap();
486            assert_eq!(writer.last_inserted_key(), b"abcd");
487            writer.insert(b"abe", &()).unwrap();
488            assert_eq!(writer.last_inserted_key(), b"abe");
489            writer.finish().unwrap();
490        }
491        let buffer = OwnedBytes::new(buffer);
492        let mut output = Vec::new();
493        assert!(
494            VoidSSTable::merge(vec![buffer.clone(), buffer.clone()], &mut output, VoidMerge)
495                .is_ok()
496        );
497        assert_eq!(&output[..], &buffer[..]);
498    }
499
500    #[test]
501    fn test_sstable_u64() -> io::Result<()> {
502        let mut buffer = Vec::new();
503        let mut writer = MonotonicU64SSTable::writer(&mut buffer);
504        writer.insert(b"abcd", &1u64)?;
505        writer.insert(b"abe", &4u64)?;
506        writer.insert(b"gogo", &4324234234234234u64)?;
507        writer.finish()?;
508        let buffer = OwnedBytes::new(buffer);
509        let mut reader = MonotonicU64SSTable::reader(buffer);
510        assert!(reader.advance()?);
511        assert_eq!(reader.key(), b"abcd");
512        assert_eq!(reader.value(), &1u64);
513        assert!(reader.advance()?);
514        assert_eq!(reader.key(), b"abe");
515        assert_eq!(reader.value(), &4u64);
516        assert!(reader.advance()?);
517        assert_eq!(reader.key(), b"gogo");
518        assert_eq!(reader.value(), &4324234234234234u64);
519        assert!(!reader.advance()?);
520        Ok(())
521    }
522
523    #[test]
524    fn test_sstable_empty() {
525        let mut sstable_range_empty = crate::RangeSSTable::create_empty_reader();
526        assert!(!sstable_range_empty.advance().unwrap());
527    }
528
529    use common::file_slice::FileSlice;
530    use proptest::prelude::*;
531
532    use crate::Dictionary;
533
534    fn bound_strategy() -> impl Strategy<Value = Bound<String>> {
535        prop_oneof![
536            Just(Bound::<String>::Unbounded),
537            "[a-c]{0,5}".prop_map(Bound::Included),
538            "[a-c]{0,5}".prop_map(Bound::Excluded),
539        ]
540    }
541
542    fn extract_key(bound: Bound<&String>) -> Option<&str> {
543        match bound.as_ref() {
544            Bound::Included(key) => Some(key.as_str()),
545            Bound::Excluded(key) => Some(key.as_str()),
546            Bound::Unbounded => None,
547        }
548    }
549
550    fn bounds_strategy() -> impl Strategy<Value = (Bound<String>, Bound<String>)> {
551        (bound_strategy(), bound_strategy()).prop_filter(
552            "Lower bound <= Upper bound",
553            |(left, right)| match (extract_key(left.as_ref()), extract_key(right.as_ref())) {
554                (None, _) => true,
555                (_, None) => true,
556                (left, right) => left < right,
557            },
558        )
559    }
560
561    proptest! {
562        #[test]
563        fn test_proptest_sstable_ranges(words in prop::collection::btree_set("[a-c]{0,6}", 1..100),
564            (lower_bound, upper_bound) in bounds_strategy(),
565        ) {
566            let mut builder = Dictionary::<VoidSSTable>::builder(Vec::new()).unwrap();
567            builder.set_block_len(16);
568            for word in &words {
569                builder.insert(word.as_bytes(), &()).unwrap();
570            }
571            let buffer: Vec<u8> = builder.finish().unwrap();
572            let dictionary: Dictionary<VoidSSTable> = Dictionary::open(FileSlice::from(buffer)).unwrap();
573            let mut range_builder = dictionary.range();
574            range_builder = match lower_bound.as_ref() {
575                Bound::Included(key) => range_builder.ge(key.as_bytes()),
576                Bound::Excluded(key) => range_builder.gt(key.as_bytes()),
577                Bound::Unbounded => range_builder,
578            };
579            range_builder = match upper_bound.as_ref() {
580                Bound::Included(key) => range_builder.le(key.as_bytes()),
581                Bound::Excluded(key) => range_builder.lt(key.as_bytes()),
582                Bound::Unbounded => range_builder,
583            };
584            let mut stream = range_builder.into_stream().unwrap();
585            let mut btree_set_range = words.range((lower_bound, upper_bound));
586            while stream.advance() {
587                let val = btree_set_range.next().unwrap();
588                assert_eq!(val.as_bytes(), stream.key());
589            }
590            assert!(btree_set_range.next().is_none());
591        }
592    }
593}