1use std::io::{self, Write};
2use std::ops::Range;
3use std::usize;
4
5use merge::ValueMerger;
6
7mod delta;
8mod dictionary;
9pub mod merge;
10mod streamer;
11pub mod value;
12
13mod sstable_index;
14pub use sstable_index::{BlockAddr, SSTableIndex, SSTableIndexBuilder};
15pub(crate) mod vint;
16pub use dictionary::Dictionary;
17pub use streamer::{Streamer, StreamerBuilder};
18
19mod block_reader;
20pub use self::block_reader::BlockReader;
21pub use self::delta::{DeltaReader, DeltaWriter};
22pub use self::merge::VoidMerge;
23use self::value::{U64MonotonicValueReader, U64MonotonicValueWriter, ValueReader, ValueWriter};
24use crate::value::{RangeValueReader, RangeValueWriter};
25
26pub type TermOrdinal = u64;
27
28const DEFAULT_KEY_CAPACITY: usize = 50;
29
30fn common_prefix_len(left: &[u8], right: &[u8]) -> usize {
33 left.iter()
34 .cloned()
35 .zip(right.iter().cloned())
36 .take_while(|(left, right)| left == right)
37 .count()
38}
39
40#[derive(Debug, Copy, Clone)]
41pub struct SSTableDataCorruption;
42
43pub trait SSTable: Sized {
46 type Value: Clone;
47 type ValueReader: ValueReader<Value = Self::Value>;
48 type ValueWriter: ValueWriter<Value = Self::Value>;
49
50 fn delta_writer<W: io::Write>(write: W) -> DeltaWriter<W, Self::ValueWriter> {
51 DeltaWriter::new(write)
52 }
53
54 fn writer<W: io::Write>(wrt: W) -> Writer<W, Self::ValueWriter> {
55 Writer::new(wrt)
56 }
57
58 fn delta_reader<'a, R: io::Read + 'a>(reader: R) -> DeltaReader<'a, Self::ValueReader> {
59 DeltaReader::new(reader)
60 }
61
62 fn reader<'a, R: io::Read + 'a>(reader: R) -> Reader<'a, Self::ValueReader> {
63 Reader {
64 key: Vec::with_capacity(DEFAULT_KEY_CAPACITY),
65 delta_reader: Self::delta_reader(reader),
66 }
67 }
68
69 fn create_empty_reader() -> Reader<'static, Self::ValueReader> {
71 Self::reader(&b""[..])
72 }
73
74 fn merge<R: io::Read, W: io::Write, M: ValueMerger<Self::Value>>(
75 io_readers: Vec<R>,
76 w: W,
77 merger: M,
78 ) -> io::Result<()> {
79 let readers: Vec<_> = io_readers.into_iter().map(Self::reader).collect();
80 let writer = Self::writer(w);
81 merge::merge_sstable::<Self, _, _>(readers, writer, merger)
82 }
83}
84
85#[allow(dead_code)]
86pub struct VoidSSTable;
87
88impl SSTable for VoidSSTable {
89 type Value = ();
90 type ValueReader = value::VoidValueReader;
91 type ValueWriter = value::VoidValueWriter;
92}
93
94#[allow(dead_code)]
101pub struct MonotonicU64SSTable;
102
103impl SSTable for MonotonicU64SSTable {
104 type Value = u64;
105
106 type ValueReader = U64MonotonicValueReader;
107
108 type ValueWriter = U64MonotonicValueWriter;
109}
110
111pub struct RangeSSTable;
121
122impl SSTable for RangeSSTable {
123 type Value = Range<u64>;
124
125 type ValueReader = RangeValueReader;
126
127 type ValueWriter = RangeValueWriter;
128}
129
130pub struct Reader<'a, TValueReader> {
132 key: Vec<u8>,
133 delta_reader: DeltaReader<'a, TValueReader>,
134}
135
136impl<'a, TValueReader> Reader<'a, TValueReader>
137where TValueReader: ValueReader
138{
139 pub fn advance(&mut self) -> io::Result<bool> {
140 if !self.delta_reader.advance()? {
141 return Ok(false);
142 }
143 let common_prefix_len = self.delta_reader.common_prefix_len();
144 let suffix = self.delta_reader.suffix();
145 let new_len = self.delta_reader.common_prefix_len() + suffix.len();
146 self.key.resize(new_len, 0u8);
147 self.key[common_prefix_len..].copy_from_slice(suffix);
148 Ok(true)
149 }
150
151 #[inline(always)]
152 pub fn key(&self) -> &[u8] {
153 &self.key
154 }
155
156 #[inline(always)]
157 pub fn value(&self) -> &TValueReader::Value {
158 self.delta_reader.value()
159 }
160}
161
162impl<'a, TValueReader> AsRef<[u8]> for Reader<'a, TValueReader> {
163 #[inline(always)]
164 fn as_ref(&self) -> &[u8] {
165 &self.key
166 }
167}
168
169pub struct Writer<W, TValueWriter>
170where W: io::Write
171{
172 previous_key: Vec<u8>,
173 index_builder: SSTableIndexBuilder,
174 delta_writer: DeltaWriter<W, TValueWriter>,
175 num_terms: u64,
176 first_ordinal_of_the_block: u64,
177}
178
179impl<W, TValueWriter> Writer<W, TValueWriter>
180where
181 W: io::Write,
182 TValueWriter: value::ValueWriter,
183{
184 #[doc(hidden)]
188 pub fn create(wrt: W) -> io::Result<Self> {
189 Ok(Self::new(wrt))
190 }
191
192 pub fn new(wrt: W) -> Self {
194 Writer {
195 previous_key: Vec::with_capacity(DEFAULT_KEY_CAPACITY),
196 num_terms: 0u64,
197 index_builder: SSTableIndexBuilder::default(),
198 delta_writer: DeltaWriter::new(wrt),
199 first_ordinal_of_the_block: 0u64,
200 }
201 }
202
203 #[inline(always)]
207 pub(crate) fn last_inserted_key(&self) -> &[u8] {
208 &self.previous_key[..]
209 }
210
211 #[inline]
218 pub fn insert<K: AsRef<[u8]>>(
219 &mut self,
220 key: K,
221 value: &TValueWriter::Value,
222 ) -> io::Result<()> {
223 self.insert_key(key.as_ref())?;
224 self.insert_value(value)?;
225 Ok(())
226 }
227
228 #[doc(hidden)]
232 #[inline]
233 pub fn insert_key(&mut self, key: &[u8]) -> io::Result<()> {
234 if self.first_ordinal_of_the_block == self.num_terms {
237 self.index_builder
238 .shorten_last_block_key_given_next_key(key);
239 }
240 let keep_len = common_prefix_len(&self.previous_key, key);
241 let add_len = key.len() - keep_len;
242 let increasing_keys = add_len > 0 && (self.previous_key.len() == keep_len)
243 || self.previous_key.is_empty()
244 || self.previous_key[keep_len] < key[keep_len];
245 assert!(
246 increasing_keys,
247 "Keys should be increasing. ({:?} > {:?})",
248 self.previous_key, key
249 );
250 self.previous_key.resize(key.len(), 0u8);
251 self.previous_key[keep_len..].copy_from_slice(&key[keep_len..]);
252 self.delta_writer.write_suffix(keep_len, &key[keep_len..]);
253 Ok(())
254 }
255
256 #[doc(hidden)]
260 #[inline]
261 pub fn insert_value(&mut self, value: &TValueWriter::Value) -> io::Result<()> {
262 self.delta_writer.write_value(value);
263 self.num_terms += 1u64;
264 self.flush_block_if_required()
265 }
266
267 pub fn flush_block_if_required(&mut self) -> io::Result<()> {
268 if let Some(byte_range) = self.delta_writer.flush_block_if_required()? {
269 self.index_builder.add_block(
270 &self.previous_key[..],
271 byte_range,
272 self.first_ordinal_of_the_block,
273 );
274 self.first_ordinal_of_the_block = self.num_terms;
275 self.previous_key.clear();
276 }
277 Ok(())
278 }
279
280 pub fn finish(mut self) -> io::Result<W> {
281 if let Some(byte_range) = self.delta_writer.flush_block()? {
282 self.index_builder.add_block(
283 &self.previous_key[..],
284 byte_range,
285 self.first_ordinal_of_the_block,
286 );
287 self.first_ordinal_of_the_block = self.num_terms;
288 }
289 let mut wrt = self.delta_writer.finish();
290 wrt.write_all(&0u32.to_le_bytes())?;
291
292 let offset = wrt.written_bytes();
293
294 self.index_builder.serialize(&mut wrt)?;
295 wrt.write_all(&offset.to_le_bytes())?;
296 wrt.write_all(&self.num_terms.to_le_bytes())?;
297 let wrt = wrt.finish();
298 Ok(wrt.into_inner()?)
299 }
300}
301
302impl<TValueWriter> Writer<Vec<u8>, TValueWriter>
303where TValueWriter: value::ValueWriter
304{
305 #[inline]
306 pub fn insert_cannot_fail<K: AsRef<[u8]>>(&mut self, key: K, value: &TValueWriter::Value) {
307 self.insert(key, value)
308 .expect("SSTable over a Vec should never fail");
309 }
310}
311
312#[cfg(test)]
313mod test {
314 use std::io;
315 use std::ops::Bound;
316
317 use super::{common_prefix_len, MonotonicU64SSTable, SSTable, VoidMerge, VoidSSTable};
318
319 fn aux_test_common_prefix_len(left: &str, right: &str, expect_len: usize) {
320 assert_eq!(
321 common_prefix_len(left.as_bytes(), right.as_bytes()),
322 expect_len
323 );
324 assert_eq!(
325 common_prefix_len(right.as_bytes(), left.as_bytes()),
326 expect_len
327 );
328 }
329
330 #[test]
331 fn test_common_prefix_len() {
332 aux_test_common_prefix_len("a", "ab", 1);
333 aux_test_common_prefix_len("", "ab", 0);
334 aux_test_common_prefix_len("ab", "abc", 2);
335 aux_test_common_prefix_len("abde", "abce", 2);
336 }
337
338 #[test]
339 fn test_long_key_diff() {
340 let long_key = (0..1_024).map(|x| (x % 255) as u8).collect::<Vec<_>>();
341 let long_key2 = (1..300).map(|x| (x % 255) as u8).collect::<Vec<_>>();
342 let mut buffer = vec![];
343 {
344 let mut sstable_writer = VoidSSTable::writer(&mut buffer);
345 assert!(sstable_writer.insert(&long_key[..], &()).is_ok());
346 assert!(sstable_writer.insert(&[0, 3, 4], &()).is_ok());
347 assert!(sstable_writer.insert(&long_key2[..], &()).is_ok());
348 assert!(sstable_writer.finish().is_ok());
349 }
350 let mut sstable_reader = VoidSSTable::reader(&buffer[..]);
351 assert!(sstable_reader.advance().unwrap());
352 assert_eq!(sstable_reader.key(), &long_key[..]);
353 assert!(sstable_reader.advance().unwrap());
354 assert_eq!(sstable_reader.key(), &[0, 3, 4]);
355 assert!(sstable_reader.advance().unwrap());
356 assert_eq!(sstable_reader.key(), &long_key2[..]);
357 assert!(!sstable_reader.advance().unwrap());
358 }
359
360 #[test]
361 fn test_simple_sstable() {
362 let mut buffer = vec![];
363 {
364 let mut sstable_writer = VoidSSTable::writer(&mut buffer);
365 assert!(sstable_writer.insert(&[17u8], &()).is_ok());
366 assert!(sstable_writer.insert(&[17u8, 18u8, 19u8], &()).is_ok());
367 assert!(sstable_writer.insert(&[17u8, 20u8], &()).is_ok());
368 assert!(sstable_writer.finish().is_ok());
369 }
370 assert_eq!(
371 &buffer,
372 &[
373 7u8, 0u8, 0u8, 0u8, 16u8, 17u8, 33u8, 18u8, 19u8, 17u8, 20u8, 0u8, 0u8, 0u8, 0u8, 161, 102, 98, 108, 111, 99, 107, 115, 129, 162, 115, 108, 97, 115, 116, 95, 107,
380 101, 121, 95, 111, 114, 95, 103, 114, 101, 97, 116, 101, 114, 130, 17, 20, 106, 98,
381 108, 111, 99, 107, 95, 97, 100, 100, 114, 162, 106, 98, 121, 116, 101, 95, 114, 97,
382 110, 103, 101, 162, 101, 115, 116, 97, 114, 116, 0, 99, 101, 110, 100, 11, 109,
383 102, 105, 114, 115, 116, 95, 111, 114, 100, 105, 110, 97, 108, 0, 15, 0, 0, 0, 0,
384 0, 0, 0, 3u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8 ]
387 );
388 let mut sstable_reader = VoidSSTable::reader(&buffer[..]);
389 assert!(sstable_reader.advance().unwrap());
390 assert_eq!(sstable_reader.key(), &[17u8]);
391 assert!(sstable_reader.advance().unwrap());
392 assert_eq!(sstable_reader.key(), &[17u8, 18u8, 19u8]);
393 assert!(sstable_reader.advance().unwrap());
394 assert_eq!(sstable_reader.key(), &[17u8, 20u8]);
395 assert!(!sstable_reader.advance().unwrap());
396 }
397
398 #[test]
399 #[should_panic]
400 fn test_simple_sstable_non_increasing_key() {
401 let mut buffer = vec![];
402 let mut sstable_writer = VoidSSTable::writer(&mut buffer);
403 assert!(sstable_writer.insert(&[17u8], &()).is_ok());
404 assert!(sstable_writer.insert(&[16u8], &()).is_ok());
405 }
406
407 #[test]
408 fn test_merge_abcd_abe() {
409 let mut buffer = Vec::new();
410 {
411 let mut writer = VoidSSTable::writer(&mut buffer);
412 writer.insert(b"abcd", &()).unwrap();
413 writer.insert(b"abe", &()).unwrap();
414 writer.finish().unwrap();
415 }
416 let mut output = Vec::new();
417 assert!(VoidSSTable::merge(vec![&buffer[..], &buffer[..]], &mut output, VoidMerge).is_ok());
418 assert_eq!(&output[..], &buffer[..]);
419 }
420
421 #[test]
422 fn test_sstable() {
423 let mut buffer = Vec::new();
424 {
425 let mut writer = VoidSSTable::writer(&mut buffer);
426 assert_eq!(writer.last_inserted_key(), b"");
427 writer.insert(b"abcd", &()).unwrap();
428 assert_eq!(writer.last_inserted_key(), b"abcd");
429 writer.insert(b"abe", &()).unwrap();
430 assert_eq!(writer.last_inserted_key(), b"abe");
431 writer.finish().unwrap();
432 }
433 let mut output = Vec::new();
434 assert!(VoidSSTable::merge(vec![&buffer[..], &buffer[..]], &mut output, VoidMerge).is_ok());
435 assert_eq!(&output[..], &buffer[..]);
436 }
437
438 #[test]
439 fn test_sstable_u64() -> io::Result<()> {
440 let mut buffer = Vec::new();
441 let mut writer = MonotonicU64SSTable::writer(&mut buffer);
442 writer.insert(b"abcd", &1u64)?;
443 writer.insert(b"abe", &4u64)?;
444 writer.insert(b"gogo", &4324234234234234u64)?;
445 writer.finish()?;
446 let mut reader = MonotonicU64SSTable::reader(&buffer[..]);
447 assert!(reader.advance()?);
448 assert_eq!(reader.key(), b"abcd");
449 assert_eq!(reader.value(), &1u64);
450 assert!(reader.advance()?);
451 assert_eq!(reader.key(), b"abe");
452 assert_eq!(reader.value(), &4u64);
453 assert!(reader.advance()?);
454 assert_eq!(reader.key(), b"gogo");
455 assert_eq!(reader.value(), &4324234234234234u64);
456 assert!(!reader.advance()?);
457 Ok(())
458 }
459
460 #[test]
461 fn test_sstable_empty() {
462 let mut sstable_range_empty = crate::RangeSSTable::create_empty_reader();
463 assert!(!sstable_range_empty.advance().unwrap());
464 }
465
466 use common::file_slice::FileSlice;
467 use proptest::prelude::*;
468
469 use crate::Dictionary;
470
471 fn bound_strategy() -> impl Strategy<Value = Bound<String>> {
472 prop_oneof![
473 Just(Bound::<String>::Unbounded),
474 "[a-c]{0,5}".prop_map(|key| Bound::Included(key)),
475 "[a-c]{0,5}".prop_map(|key| Bound::Excluded(key)),
476 ]
477 }
478
479 fn extract_key(bound: Bound<&String>) -> Option<&str> {
480 match bound.as_ref() {
481 Bound::Included(key) => Some(key.as_str()),
482 Bound::Excluded(key) => Some(key.as_str()),
483 Bound::Unbounded => None,
484 }
485 }
486
487 fn bounds_strategy() -> impl Strategy<Value = (Bound<String>, Bound<String>)> {
488 (bound_strategy(), bound_strategy()).prop_filter(
489 "Lower bound <= Upper bound",
490 |(left, right)| match (extract_key(left.as_ref()), extract_key(right.as_ref())) {
491 (None, _) => true,
492 (_, None) => true,
493 (left, right) => left < right,
494 },
495 )
496 }
497
498 proptest! {
499 #[test]
500 fn test_proptest_sstable_ranges(words in prop::collection::btree_set("[a-c]{0,6}", 1..100),
501 (lower_bound, upper_bound) in bounds_strategy(),
502 ) {
503 let mut builder = Dictionary::<VoidSSTable>::builder(Vec::new()).unwrap();
505 for word in &words {
506 builder.insert(word.as_bytes(), &()).unwrap();
507 }
508 let buffer: Vec<u8> = builder.finish().unwrap();
509 let dictionary: Dictionary<VoidSSTable> = Dictionary::open(FileSlice::from(buffer)).unwrap();
510 let mut range_builder = dictionary.range();
511 range_builder = match lower_bound.as_ref() {
512 Bound::Included(key) => range_builder.ge(key.as_bytes()),
513 Bound::Excluded(key) => range_builder.gt(key.as_bytes()),
514 Bound::Unbounded => range_builder,
515 };
516 range_builder = match upper_bound.as_ref() {
517 Bound::Included(key) => range_builder.le(key.as_bytes()),
518 Bound::Excluded(key) => range_builder.lt(key.as_bytes()),
519 Bound::Unbounded => range_builder,
520 };
521 let mut stream = range_builder.into_stream().unwrap();
522 let mut btree_set_range = words.range((lower_bound, upper_bound));
523 while stream.advance() {
524 let val = btree_set_range.next().unwrap();
525 assert_eq!(val.as_bytes(), stream.key());
526 }
527 assert!(btree_set_range.next().is_none());
528 }
529 }
530}