summavy_sstable/
dictionary.rs

1use std::io;
2use std::marker::PhantomData;
3use std::ops::{Bound, RangeBounds};
4use std::sync::Arc;
5
6use common::file_slice::FileSlice;
7use common::{BinarySerializable, OwnedBytes};
8use tantivy_fst::automaton::AlwaysMatch;
9use tantivy_fst::Automaton;
10
11use crate::streamer::{Streamer, StreamerBuilder};
12use crate::{BlockAddr, DeltaReader, Reader, SSTable, SSTableIndex, TermOrdinal};
13
14/// An SSTable is a sorted map that associates sorted `&[u8]` keys
15/// to any kind of typed values.
16///
17/// The SSTable is organized in blocks.
18/// In each block, keys and values are encoded separately.
19///
20/// The keys are encoded using incremental encoding.
21/// The values on the other hand, are encoded according to a value-specific
22/// codec defined in the TSSTable generic argument.
23///
24/// Finally, an index is joined to the Dictionary to make it possible,
25/// given a key to identify which block contains this key.
26///
27/// The codec was designed in such a way that the sstable
28/// reader is not aware of block, and yet can read any sequence of blocks,
29/// as long as the slice of bytes it is given starts and stops at
30/// block boundary.
31///
32/// (See also README.md)
33pub struct Dictionary<TSSTable: SSTable> {
34    pub sstable_slice: FileSlice,
35    pub sstable_index: SSTableIndex,
36    num_terms: u64,
37    phantom_data: PhantomData<TSSTable>,
38}
39
40impl<TSSTable: SSTable> Dictionary<TSSTable> {
41    pub fn builder<W: io::Write>(wrt: W) -> io::Result<crate::Writer<W, TSSTable::ValueWriter>> {
42        Ok(TSSTable::writer(wrt))
43    }
44
45    pub(crate) fn sstable_reader(&self) -> io::Result<Reader<'static, TSSTable::ValueReader>> {
46        let data = self.sstable_slice.read_bytes()?;
47        Ok(TSSTable::reader(data))
48    }
49
50    pub(crate) fn sstable_reader_block(
51        &self,
52        block_addr: BlockAddr,
53    ) -> io::Result<Reader<'static, TSSTable::ValueReader>> {
54        let data = self.sstable_slice.read_bytes_slice(block_addr.byte_range)?;
55        Ok(TSSTable::reader(data))
56    }
57
58    pub(crate) async fn sstable_reader_block_async(
59        &self,
60        block_addr: BlockAddr,
61    ) -> io::Result<Reader<'static, TSSTable::ValueReader>> {
62        let data = self
63            .sstable_slice
64            .read_bytes_slice_async(block_addr.byte_range)
65            .await?;
66        Ok(TSSTable::reader(data))
67    }
68
69    pub(crate) fn sstable_delta_reader_for_key_range(
70        &self,
71        key_range: impl RangeBounds<[u8]>,
72    ) -> io::Result<DeltaReader<'static, TSSTable::ValueReader>> {
73        let slice = self.file_slice_for_range(key_range);
74        let data = slice.read_bytes()?;
75        Ok(TSSTable::delta_reader(data))
76    }
77
78    /// This function returns a file slice covering a set of sstable blocks
79    /// that include the key range passed in arguments.
80    ///
81    /// It works by identifying
82    /// - `first_block`: the block containing the start boudary key
83    /// - `last_block`: the block containing the end boundary key.
84    ///
85    /// And then returning the range that spans over all blocks between.
86    /// and including first_block and last_block, aka:
87    /// `[first_block.start_offset .. last_block.end_offset)`
88    ///
89    /// Technically this function does not provide the tightest fit, as
90    /// for simplification, it treats the start bound of the `key_range`
91    /// as if it was inclusive, even if it is exclusive.
92    /// On the rare edge case where a user asks for `(start_key, end_key]`
93    /// and `start_key` happens to be the last key of a block, we return a
94    /// slice that is the first block was not necessary.
95    fn file_slice_for_range(&self, key_range: impl RangeBounds<[u8]>) -> FileSlice {
96        let start_bound: Bound<usize> = match key_range.start_bound() {
97            Bound::Included(key) | Bound::Excluded(key) => {
98                let Some(first_block_addr) = self.sstable_index.search_block(key) else {
99                    return FileSlice::empty();
100                };
101                Bound::Included(first_block_addr.byte_range.start)
102            }
103            Bound::Unbounded => Bound::Unbounded,
104        };
105        let end_bound: Bound<usize> = match key_range.end_bound() {
106            Bound::Included(key) | Bound::Excluded(key) => {
107                if let Some(block_addr) = self.sstable_index.search_block(key) {
108                    Bound::Excluded(block_addr.byte_range.end)
109                } else {
110                    Bound::Unbounded
111                }
112            }
113            Bound::Unbounded => Bound::Unbounded,
114        };
115        self.sstable_slice.slice((start_bound, end_bound))
116    }
117
118    /// Opens a `TermDictionary`.
119    pub fn open(term_dictionary_file: FileSlice) -> io::Result<Self> {
120        let (main_slice, footer_len_slice) = term_dictionary_file.split_from_end(16);
121        let mut footer_len_bytes: OwnedBytes = footer_len_slice.read_bytes()?;
122        let index_offset = u64::deserialize(&mut footer_len_bytes)?;
123        let num_terms = u64::deserialize(&mut footer_len_bytes)?;
124        let (sstable_slice, index_slice) = main_slice.split(index_offset as usize);
125        let sstable_index_bytes = index_slice.read_bytes()?;
126        let sstable_index = SSTableIndex::load(sstable_index_bytes.as_slice())
127            .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "SSTable corruption"))?;
128        Ok(Dictionary {
129            sstable_slice,
130            sstable_index,
131            num_terms,
132            phantom_data: PhantomData,
133        })
134    }
135
136    /// Creates a term dictionary from the supplied bytes.
137    pub fn from_bytes(owned_bytes: OwnedBytes) -> io::Result<Self> {
138        Dictionary::open(FileSlice::new(Arc::new(owned_bytes)))
139    }
140
141    /// Creates an empty term dictionary which contains no terms.
142    pub fn empty() -> Self {
143        let term_dictionary_data: Vec<u8> = Self::builder(Vec::<u8>::new())
144            .expect("Creating a TermDictionaryBuilder in a Vec<u8> should never fail")
145            .finish()
146            .expect("Writing in a Vec<u8> should never fail");
147        let empty_dict_file = FileSlice::from(term_dictionary_data);
148        Dictionary::open(empty_dict_file).unwrap()
149    }
150
151    /// Returns the number of terms in the dictionary.
152    /// Term ordinals range from 0 to `num_terms() - 1`.
153    pub fn num_terms(&self) -> usize {
154        self.num_terms as usize
155    }
156
157    /// Returns the ordinal associated with a given term.
158    pub fn term_ord<K: AsRef<[u8]>>(&self, key: K) -> io::Result<Option<TermOrdinal>> {
159        let mut term_ord = 0u64;
160        let key_bytes = key.as_ref();
161        let mut sstable_reader = self.sstable_reader()?;
162        while sstable_reader.advance().unwrap_or(false) {
163            if sstable_reader.key() == key_bytes {
164                return Ok(Some(term_ord));
165            }
166            term_ord += 1;
167        }
168        Ok(None)
169    }
170
171    /// Returns the term associated with a given term ordinal.
172    ///
173    /// Term ordinals are defined as the position of the term in
174    /// the sorted list of terms.
175    ///
176    /// Returns true if and only if the term has been found.
177    ///
178    /// Regardless of whether the term is found or not,
179    /// the buffer may be modified.
180    pub fn ord_to_term(&self, ord: TermOrdinal, bytes: &mut Vec<u8>) -> io::Result<bool> {
181        let mut sstable_reader = self.sstable_reader()?;
182        bytes.clear();
183        for _ in 0..(ord + 1) {
184            if !sstable_reader.advance().unwrap_or(false) {
185                return Ok(false);
186            }
187        }
188        bytes.extend_from_slice(sstable_reader.key());
189        Ok(true)
190    }
191
192    /// Returns the number of terms in the dictionary.
193    pub fn term_info_from_ord(&self, term_ord: TermOrdinal) -> io::Result<Option<TSSTable::Value>> {
194        let mut sstable_reader = self.sstable_reader()?;
195        for _ in 0..(term_ord + 1) {
196            if !sstable_reader.advance().unwrap_or(false) {
197                return Ok(None);
198            }
199        }
200        Ok(Some(sstable_reader.value().clone()))
201    }
202
203    /// Lookups the value corresponding to the key.
204    pub fn get<K: AsRef<[u8]>>(&self, key: K) -> io::Result<Option<TSSTable::Value>> {
205        if let Some(block_addr) = self.sstable_index.search_block(key.as_ref()) {
206            let mut sstable_reader = self.sstable_reader_block(block_addr)?;
207            let key_bytes = key.as_ref();
208            while sstable_reader.advance().unwrap_or(false) {
209                if sstable_reader.key() == key_bytes {
210                    let value = sstable_reader.value().clone();
211                    return Ok(Some(value));
212                }
213            }
214        }
215        Ok(None)
216    }
217
218    /// Lookups the value corresponding to the key.
219    pub async fn get_async<K: AsRef<[u8]>>(&self, key: K) -> io::Result<Option<TSSTable::Value>> {
220        if let Some(block_addr) = self.sstable_index.search_block(key.as_ref()) {
221            let mut sstable_reader = self.sstable_reader_block_async(block_addr).await?;
222            let key_bytes = key.as_ref();
223            while sstable_reader.advance().unwrap_or(false) {
224                if sstable_reader.key() == key_bytes {
225                    let value = sstable_reader.value().clone();
226                    return Ok(Some(value));
227                }
228            }
229        }
230        Ok(None)
231    }
232
233    /// Returns a range builder, to stream all of the terms
234    /// within an interval.
235    pub fn range(&self) -> StreamerBuilder<'_, TSSTable> {
236        StreamerBuilder::new(self, AlwaysMatch)
237    }
238
239    /// A stream of all the sorted terms.
240    pub fn stream(&self) -> io::Result<Streamer<'_, TSSTable>> {
241        self.range().into_stream()
242    }
243
244    /// Returns a search builder, to stream all of the terms
245    /// within the Automaton
246    pub fn search<'a, A: Automaton + 'a>(
247        &'a self,
248        automaton: A,
249    ) -> StreamerBuilder<'a, TSSTable, A>
250    where
251        A::State: Clone,
252    {
253        StreamerBuilder::<TSSTable, A>::new(self, automaton)
254    }
255
256    #[doc(hidden)]
257    pub async fn warm_up_dictionary(&self) -> io::Result<()> {
258        self.sstable_slice.read_bytes_async().await?;
259        Ok(())
260    }
261}