summavy_sstable/
dictionary.rs1use std::io;
2use std::marker::PhantomData;
3use std::ops::{Bound, RangeBounds};
4use std::sync::Arc;
5
6use common::file_slice::FileSlice;
7use common::{BinarySerializable, OwnedBytes};
8use tantivy_fst::automaton::AlwaysMatch;
9use tantivy_fst::Automaton;
10
11use crate::streamer::{Streamer, StreamerBuilder};
12use crate::{BlockAddr, DeltaReader, Reader, SSTable, SSTableIndex, TermOrdinal};
13
14pub struct Dictionary<TSSTable: SSTable> {
34 pub sstable_slice: FileSlice,
35 pub sstable_index: SSTableIndex,
36 num_terms: u64,
37 phantom_data: PhantomData<TSSTable>,
38}
39
40impl<TSSTable: SSTable> Dictionary<TSSTable> {
41 pub fn builder<W: io::Write>(wrt: W) -> io::Result<crate::Writer<W, TSSTable::ValueWriter>> {
42 Ok(TSSTable::writer(wrt))
43 }
44
45 pub(crate) fn sstable_reader(&self) -> io::Result<Reader<'static, TSSTable::ValueReader>> {
46 let data = self.sstable_slice.read_bytes()?;
47 Ok(TSSTable::reader(data))
48 }
49
50 pub(crate) fn sstable_reader_block(
51 &self,
52 block_addr: BlockAddr,
53 ) -> io::Result<Reader<'static, TSSTable::ValueReader>> {
54 let data = self.sstable_slice.read_bytes_slice(block_addr.byte_range)?;
55 Ok(TSSTable::reader(data))
56 }
57
58 pub(crate) async fn sstable_reader_block_async(
59 &self,
60 block_addr: BlockAddr,
61 ) -> io::Result<Reader<'static, TSSTable::ValueReader>> {
62 let data = self
63 .sstable_slice
64 .read_bytes_slice_async(block_addr.byte_range)
65 .await?;
66 Ok(TSSTable::reader(data))
67 }
68
69 pub(crate) fn sstable_delta_reader_for_key_range(
70 &self,
71 key_range: impl RangeBounds<[u8]>,
72 ) -> io::Result<DeltaReader<'static, TSSTable::ValueReader>> {
73 let slice = self.file_slice_for_range(key_range);
74 let data = slice.read_bytes()?;
75 Ok(TSSTable::delta_reader(data))
76 }
77
78 fn file_slice_for_range(&self, key_range: impl RangeBounds<[u8]>) -> FileSlice {
96 let start_bound: Bound<usize> = match key_range.start_bound() {
97 Bound::Included(key) | Bound::Excluded(key) => {
98 let Some(first_block_addr) = self.sstable_index.search_block(key) else {
99 return FileSlice::empty();
100 };
101 Bound::Included(first_block_addr.byte_range.start)
102 }
103 Bound::Unbounded => Bound::Unbounded,
104 };
105 let end_bound: Bound<usize> = match key_range.end_bound() {
106 Bound::Included(key) | Bound::Excluded(key) => {
107 if let Some(block_addr) = self.sstable_index.search_block(key) {
108 Bound::Excluded(block_addr.byte_range.end)
109 } else {
110 Bound::Unbounded
111 }
112 }
113 Bound::Unbounded => Bound::Unbounded,
114 };
115 self.sstable_slice.slice((start_bound, end_bound))
116 }
117
118 pub fn open(term_dictionary_file: FileSlice) -> io::Result<Self> {
120 let (main_slice, footer_len_slice) = term_dictionary_file.split_from_end(16);
121 let mut footer_len_bytes: OwnedBytes = footer_len_slice.read_bytes()?;
122 let index_offset = u64::deserialize(&mut footer_len_bytes)?;
123 let num_terms = u64::deserialize(&mut footer_len_bytes)?;
124 let (sstable_slice, index_slice) = main_slice.split(index_offset as usize);
125 let sstable_index_bytes = index_slice.read_bytes()?;
126 let sstable_index = SSTableIndex::load(sstable_index_bytes.as_slice())
127 .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "SSTable corruption"))?;
128 Ok(Dictionary {
129 sstable_slice,
130 sstable_index,
131 num_terms,
132 phantom_data: PhantomData,
133 })
134 }
135
136 pub fn from_bytes(owned_bytes: OwnedBytes) -> io::Result<Self> {
138 Dictionary::open(FileSlice::new(Arc::new(owned_bytes)))
139 }
140
141 pub fn empty() -> Self {
143 let term_dictionary_data: Vec<u8> = Self::builder(Vec::<u8>::new())
144 .expect("Creating a TermDictionaryBuilder in a Vec<u8> should never fail")
145 .finish()
146 .expect("Writing in a Vec<u8> should never fail");
147 let empty_dict_file = FileSlice::from(term_dictionary_data);
148 Dictionary::open(empty_dict_file).unwrap()
149 }
150
151 pub fn num_terms(&self) -> usize {
154 self.num_terms as usize
155 }
156
157 pub fn term_ord<K: AsRef<[u8]>>(&self, key: K) -> io::Result<Option<TermOrdinal>> {
159 let mut term_ord = 0u64;
160 let key_bytes = key.as_ref();
161 let mut sstable_reader = self.sstable_reader()?;
162 while sstable_reader.advance().unwrap_or(false) {
163 if sstable_reader.key() == key_bytes {
164 return Ok(Some(term_ord));
165 }
166 term_ord += 1;
167 }
168 Ok(None)
169 }
170
171 pub fn ord_to_term(&self, ord: TermOrdinal, bytes: &mut Vec<u8>) -> io::Result<bool> {
181 let mut sstable_reader = self.sstable_reader()?;
182 bytes.clear();
183 for _ in 0..(ord + 1) {
184 if !sstable_reader.advance().unwrap_or(false) {
185 return Ok(false);
186 }
187 }
188 bytes.extend_from_slice(sstable_reader.key());
189 Ok(true)
190 }
191
192 pub fn term_info_from_ord(&self, term_ord: TermOrdinal) -> io::Result<Option<TSSTable::Value>> {
194 let mut sstable_reader = self.sstable_reader()?;
195 for _ in 0..(term_ord + 1) {
196 if !sstable_reader.advance().unwrap_or(false) {
197 return Ok(None);
198 }
199 }
200 Ok(Some(sstable_reader.value().clone()))
201 }
202
203 pub fn get<K: AsRef<[u8]>>(&self, key: K) -> io::Result<Option<TSSTable::Value>> {
205 if let Some(block_addr) = self.sstable_index.search_block(key.as_ref()) {
206 let mut sstable_reader = self.sstable_reader_block(block_addr)?;
207 let key_bytes = key.as_ref();
208 while sstable_reader.advance().unwrap_or(false) {
209 if sstable_reader.key() == key_bytes {
210 let value = sstable_reader.value().clone();
211 return Ok(Some(value));
212 }
213 }
214 }
215 Ok(None)
216 }
217
218 pub async fn get_async<K: AsRef<[u8]>>(&self, key: K) -> io::Result<Option<TSSTable::Value>> {
220 if let Some(block_addr) = self.sstable_index.search_block(key.as_ref()) {
221 let mut sstable_reader = self.sstable_reader_block_async(block_addr).await?;
222 let key_bytes = key.as_ref();
223 while sstable_reader.advance().unwrap_or(false) {
224 if sstable_reader.key() == key_bytes {
225 let value = sstable_reader.value().clone();
226 return Ok(Some(value));
227 }
228 }
229 }
230 Ok(None)
231 }
232
233 pub fn range(&self) -> StreamerBuilder<'_, TSSTable> {
236 StreamerBuilder::new(self, AlwaysMatch)
237 }
238
239 pub fn stream(&self) -> io::Result<Streamer<'_, TSSTable>> {
241 self.range().into_stream()
242 }
243
244 pub fn search<'a, A: Automaton + 'a>(
247 &'a self,
248 automaton: A,
249 ) -> StreamerBuilder<'a, TSSTable, A>
250 where
251 A::State: Clone,
252 {
253 StreamerBuilder::<TSSTable, A>::new(self, automaton)
254 }
255
256 #[doc(hidden)]
257 pub async fn warm_up_dictionary(&self) -> io::Result<()> {
258 self.sstable_slice.read_bytes_async().await?;
259 Ok(())
260 }
261}