1use std::io::{self, Write};
39use std::ops::Range;
40
41use merge::ValueMerger;
42
43mod block_match_automaton;
44mod delta;
45mod dictionary;
46pub mod merge;
47mod streamer;
48pub mod value;
49
50mod sstable_index_v3;
51pub use sstable_index_v3::{BlockAddr, SSTableIndex, SSTableIndexBuilder, SSTableIndexV3};
52mod sstable_index_v2;
53pub(crate) mod vint;
54pub use dictionary::Dictionary;
55pub use streamer::{Streamer, StreamerBuilder};
56
57mod block_reader;
58use common::{BinarySerializable, OwnedBytes};
59use value::{VecU32ValueReader, VecU32ValueWriter};
60
61pub use self::block_reader::BlockReader;
62pub use self::delta::{DeltaReader, DeltaWriter};
63pub use self::merge::VoidMerge;
64use self::value::{U64MonotonicValueReader, U64MonotonicValueWriter, ValueReader, ValueWriter};
65use crate::value::{RangeValueReader, RangeValueWriter};
66
67pub type TermOrdinal = u64;
68
69const DEFAULT_KEY_CAPACITY: usize = 50;
70const SSTABLE_VERSION: u32 = 3;
71
72fn common_prefix_len(left: &[u8], right: &[u8]) -> usize {
75 left.iter()
76 .cloned()
77 .zip(right.iter().cloned())
78 .take_while(|(left, right)| left == right)
79 .count()
80}
81
82#[derive(Debug, Copy, Clone)]
83pub struct SSTableDataCorruption;
84
85pub trait SSTable: Sized {
88 type Value: Clone;
89 type ValueReader: ValueReader<Value = Self::Value>;
90 type ValueWriter: ValueWriter<Value = Self::Value>;
91
92 fn delta_writer<W: io::Write>(write: W) -> DeltaWriter<W, Self::ValueWriter> {
93 DeltaWriter::new(write)
94 }
95
96 fn writer<W: io::Write>(wrt: W) -> Writer<W, Self::ValueWriter> {
97 Writer::new(wrt)
98 }
99
100 fn delta_reader(reader: OwnedBytes) -> DeltaReader<Self::ValueReader> {
101 DeltaReader::new(reader)
102 }
103
104 fn reader(reader: OwnedBytes) -> Reader<Self::ValueReader> {
105 Reader {
106 key: Vec::with_capacity(DEFAULT_KEY_CAPACITY),
107 delta_reader: Self::delta_reader(reader),
108 }
109 }
110
111 fn create_empty_reader() -> Reader<Self::ValueReader> {
113 Self::reader(OwnedBytes::empty())
114 }
115
116 fn merge<W: io::Write, M: ValueMerger<Self::Value>>(
117 io_readers: Vec<OwnedBytes>,
118 w: W,
119 merger: M,
120 ) -> io::Result<()> {
121 let readers: Vec<_> = io_readers.into_iter().map(Self::reader).collect();
122 let writer = Self::writer(w);
123 merge::merge_sstable::<Self, _, _>(readers, writer, merger)
124 }
125}
126
127pub struct VoidSSTable;
128
129impl SSTable for VoidSSTable {
130 type Value = ();
131 type ValueReader = value::VoidValueReader;
132 type ValueWriter = value::VoidValueWriter;
133}
134
135pub struct MonotonicU64SSTable;
142
143impl SSTable for MonotonicU64SSTable {
144 type Value = u64;
145
146 type ValueReader = U64MonotonicValueReader;
147
148 type ValueWriter = U64MonotonicValueWriter;
149}
150
151#[derive(Clone, Copy, Debug)]
161pub struct RangeSSTable;
162
163impl SSTable for RangeSSTable {
164 type Value = Range<u64>;
165
166 type ValueReader = RangeValueReader;
167
168 type ValueWriter = RangeValueWriter;
169}
170
171pub struct VecU32ValueSSTable;
173
174impl SSTable for VecU32ValueSSTable {
175 type Value = Vec<u32>;
176 type ValueReader = VecU32ValueReader;
177 type ValueWriter = VecU32ValueWriter;
178}
179
180pub struct Reader<TValueReader> {
182 key: Vec<u8>,
183 delta_reader: DeltaReader<TValueReader>,
184}
185
186impl<TValueReader> Reader<TValueReader>
187where TValueReader: ValueReader
188{
189 pub fn advance(&mut self) -> io::Result<bool> {
190 if !self.delta_reader.advance()? {
191 return Ok(false);
192 }
193 let common_prefix_len = self.delta_reader.common_prefix_len();
194 let suffix = self.delta_reader.suffix();
195 let new_len = self.delta_reader.common_prefix_len() + suffix.len();
196 self.key.resize(new_len, 0u8);
197 self.key[common_prefix_len..].copy_from_slice(suffix);
198 Ok(true)
199 }
200
201 #[inline(always)]
202 pub fn key(&self) -> &[u8] {
203 &self.key
204 }
205
206 #[inline(always)]
207 pub fn value(&self) -> &TValueReader::Value {
208 self.delta_reader.value()
209 }
210}
211
212impl<TValueReader> AsRef<[u8]> for Reader<TValueReader> {
213 #[inline(always)]
214 fn as_ref(&self) -> &[u8] {
215 &self.key
216 }
217}
218
219pub struct Writer<W, TValueWriter>
220where W: io::Write
221{
222 previous_key: Vec<u8>,
223 index_builder: SSTableIndexBuilder,
224 delta_writer: DeltaWriter<W, TValueWriter>,
225 num_terms: u64,
226 first_ordinal_of_the_block: u64,
227}
228
229impl<W, TValueWriter> Writer<W, TValueWriter>
230where
231 W: io::Write,
232 TValueWriter: value::ValueWriter,
233{
234 #[doc(hidden)]
238 pub fn create(wrt: W) -> io::Result<Self> {
239 Ok(Self::new(wrt))
240 }
241
242 pub fn new(wrt: W) -> Self {
244 Writer {
245 previous_key: Vec::with_capacity(DEFAULT_KEY_CAPACITY),
246 num_terms: 0u64,
247 index_builder: SSTableIndexBuilder::default(),
248 delta_writer: DeltaWriter::new(wrt),
249 first_ordinal_of_the_block: 0u64,
250 }
251 }
252
253 pub fn set_block_len(&mut self, block_len: usize) {
258 self.delta_writer.set_block_len(block_len)
259 }
260
261 #[inline(always)]
265 pub(crate) fn last_inserted_key(&self) -> &[u8] {
266 &self.previous_key[..]
267 }
268
269 #[inline]
276 pub fn insert<K: AsRef<[u8]>>(
277 &mut self,
278 key: K,
279 value: &TValueWriter::Value,
280 ) -> io::Result<()> {
281 self.insert_key(key.as_ref())?;
282 self.insert_value(value)?;
283 Ok(())
284 }
285
286 #[doc(hidden)]
290 #[inline]
291 pub fn insert_key(&mut self, key: &[u8]) -> io::Result<()> {
292 if self.first_ordinal_of_the_block == self.num_terms {
295 self.index_builder
296 .shorten_last_block_key_given_next_key(key);
297 }
298 let keep_len = common_prefix_len(&self.previous_key, key);
299 let add_len = key.len() - keep_len;
300 let increasing_keys = add_len > 0 && (self.previous_key.len() == keep_len)
301 || self.previous_key.is_empty()
302 || self.previous_key[keep_len] < key[keep_len];
303 assert!(
304 increasing_keys,
305 "Keys should be increasing. ({:?} > {key:?})",
306 self.previous_key
307 );
308 self.previous_key.resize(key.len(), 0u8);
309 self.previous_key[keep_len..].copy_from_slice(&key[keep_len..]);
310 self.delta_writer.write_suffix(keep_len, &key[keep_len..]);
311 Ok(())
312 }
313
314 #[doc(hidden)]
318 #[inline]
319 pub fn insert_value(&mut self, value: &TValueWriter::Value) -> io::Result<()> {
320 self.delta_writer.write_value(value);
321 self.num_terms += 1u64;
322 self.flush_block_if_required()
323 }
324
325 pub fn flush_block_if_required(&mut self) -> io::Result<()> {
326 if let Some(byte_range) = self.delta_writer.flush_block_if_required()? {
327 self.index_builder.add_block(
328 &self.previous_key[..],
329 byte_range,
330 self.first_ordinal_of_the_block,
331 );
332 self.first_ordinal_of_the_block = self.num_terms;
333 self.previous_key.clear();
334 }
335 Ok(())
336 }
337
338 pub fn finish(mut self) -> io::Result<W> {
339 if let Some(byte_range) = self.delta_writer.flush_block()? {
340 self.index_builder.add_block(
341 &self.previous_key[..],
342 byte_range,
343 self.first_ordinal_of_the_block,
344 );
345 self.first_ordinal_of_the_block = self.num_terms;
346 }
347 let mut wrt = self.delta_writer.finish();
348 wrt.write_all(&0u32.to_le_bytes())?;
350
351 let offset = wrt.written_bytes();
352
353 let fst_len: u64 = self.index_builder.serialize(&mut wrt)?;
354 wrt.write_all(&fst_len.to_le_bytes())?;
355 wrt.write_all(&offset.to_le_bytes())?;
356 wrt.write_all(&self.num_terms.to_le_bytes())?;
357
358 SSTABLE_VERSION.serialize(&mut wrt)?;
359
360 let wrt = wrt.finish();
361 Ok(wrt.into_inner()?)
362 }
363}
364
365#[cfg(test)]
366mod test {
367 use std::io;
368 use std::ops::Bound;
369
370 use common::OwnedBytes;
371
372 use super::{MonotonicU64SSTable, SSTable, VoidMerge, VoidSSTable, common_prefix_len};
373
374 fn aux_test_common_prefix_len(left: &str, right: &str, expect_len: usize) {
375 assert_eq!(
376 common_prefix_len(left.as_bytes(), right.as_bytes()),
377 expect_len
378 );
379 assert_eq!(
380 common_prefix_len(right.as_bytes(), left.as_bytes()),
381 expect_len
382 );
383 }
384
385 #[test]
386 fn test_common_prefix_len() {
387 aux_test_common_prefix_len("a", "ab", 1);
388 aux_test_common_prefix_len("", "ab", 0);
389 aux_test_common_prefix_len("ab", "abc", 2);
390 aux_test_common_prefix_len("abde", "abce", 2);
391 }
392
393 #[test]
394 fn test_long_key_diff() {
395 let long_key = (0..1_024).map(|x| (x % 255) as u8).collect::<Vec<_>>();
396 let long_key2 = (1..300).map(|x| (x % 255) as u8).collect::<Vec<_>>();
397 let mut buffer = vec![];
398 {
399 let mut sstable_writer = VoidSSTable::writer(&mut buffer);
400 assert!(sstable_writer.insert(&long_key[..], &()).is_ok());
401 assert!(sstable_writer.insert([0, 3, 4], &()).is_ok());
402 assert!(sstable_writer.insert(&long_key2[..], &()).is_ok());
403 assert!(sstable_writer.finish().is_ok());
404 }
405 let buffer = OwnedBytes::new(buffer);
406 let mut sstable_reader = VoidSSTable::reader(buffer);
407 assert!(sstable_reader.advance().unwrap());
408 assert_eq!(sstable_reader.key(), &long_key[..]);
409 assert!(sstable_reader.advance().unwrap());
410 assert_eq!(sstable_reader.key(), &[0, 3, 4]);
411 assert!(sstable_reader.advance().unwrap());
412 assert_eq!(sstable_reader.key(), &long_key2[..]);
413 assert!(!sstable_reader.advance().unwrap());
414 }
415
416 #[test]
417 fn test_simple_sstable() {
418 let mut buffer = vec![];
419 {
420 let mut sstable_writer = VoidSSTable::writer(&mut buffer);
421 assert!(sstable_writer.insert([17u8], &()).is_ok());
422 assert!(sstable_writer.insert([17u8, 18u8, 19u8], &()).is_ok());
423 assert!(sstable_writer.insert([17u8, 20u8], &()).is_ok());
424 assert!(sstable_writer.finish().is_ok());
425 }
426 assert_eq!(
427 &buffer,
428 &[
429 8, 0, 0, 0, 0, 16, 17, 33, 18, 19, 17, 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 16, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, ]
440 );
441 let buffer = OwnedBytes::new(buffer);
442 let mut sstable_reader = VoidSSTable::reader(buffer);
443 assert!(sstable_reader.advance().unwrap());
444 assert_eq!(sstable_reader.key(), &[17u8]);
445 assert!(sstable_reader.advance().unwrap());
446 assert_eq!(sstable_reader.key(), &[17u8, 18u8, 19u8]);
447 assert!(sstable_reader.advance().unwrap());
448 assert_eq!(sstable_reader.key(), &[17u8, 20u8]);
449 assert!(!sstable_reader.advance().unwrap());
450 }
451
452 #[test]
453 #[should_panic]
454 fn test_simple_sstable_non_increasing_key() {
455 let mut buffer = vec![];
456 let mut sstable_writer = VoidSSTable::writer(&mut buffer);
457 assert!(sstable_writer.insert([17u8], &()).is_ok());
458 assert!(sstable_writer.insert([16u8], &()).is_ok());
459 }
460
461 #[test]
462 fn test_merge_abcd_abe() {
463 let mut buffer = Vec::new();
464 {
465 let mut writer = VoidSSTable::writer(&mut buffer);
466 writer.insert(b"abcd", &()).unwrap();
467 writer.insert(b"abe", &()).unwrap();
468 writer.finish().unwrap();
469 }
470 let buffer = OwnedBytes::new(buffer);
471 let mut output = Vec::new();
472 assert!(
473 VoidSSTable::merge(vec![buffer.clone(), buffer.clone()], &mut output, VoidMerge)
474 .is_ok()
475 );
476 assert_eq!(&output[..], &buffer[..]);
477 }
478
479 #[test]
480 fn test_sstable() {
481 let mut buffer = Vec::new();
482 {
483 let mut writer = VoidSSTable::writer(&mut buffer);
484 assert_eq!(writer.last_inserted_key(), b"");
485 writer.insert(b"abcd", &()).unwrap();
486 assert_eq!(writer.last_inserted_key(), b"abcd");
487 writer.insert(b"abe", &()).unwrap();
488 assert_eq!(writer.last_inserted_key(), b"abe");
489 writer.finish().unwrap();
490 }
491 let buffer = OwnedBytes::new(buffer);
492 let mut output = Vec::new();
493 assert!(
494 VoidSSTable::merge(vec![buffer.clone(), buffer.clone()], &mut output, VoidMerge)
495 .is_ok()
496 );
497 assert_eq!(&output[..], &buffer[..]);
498 }
499
500 #[test]
501 fn test_sstable_u64() -> io::Result<()> {
502 let mut buffer = Vec::new();
503 let mut writer = MonotonicU64SSTable::writer(&mut buffer);
504 writer.insert(b"abcd", &1u64)?;
505 writer.insert(b"abe", &4u64)?;
506 writer.insert(b"gogo", &4324234234234234u64)?;
507 writer.finish()?;
508 let buffer = OwnedBytes::new(buffer);
509 let mut reader = MonotonicU64SSTable::reader(buffer);
510 assert!(reader.advance()?);
511 assert_eq!(reader.key(), b"abcd");
512 assert_eq!(reader.value(), &1u64);
513 assert!(reader.advance()?);
514 assert_eq!(reader.key(), b"abe");
515 assert_eq!(reader.value(), &4u64);
516 assert!(reader.advance()?);
517 assert_eq!(reader.key(), b"gogo");
518 assert_eq!(reader.value(), &4324234234234234u64);
519 assert!(!reader.advance()?);
520 Ok(())
521 }
522
523 #[test]
524 fn test_sstable_empty() {
525 let mut sstable_range_empty = crate::RangeSSTable::create_empty_reader();
526 assert!(!sstable_range_empty.advance().unwrap());
527 }
528
529 use common::file_slice::FileSlice;
530 use proptest::prelude::*;
531
532 use crate::Dictionary;
533
534 fn bound_strategy() -> impl Strategy<Value = Bound<String>> {
535 prop_oneof![
536 Just(Bound::<String>::Unbounded),
537 "[a-c]{0,5}".prop_map(Bound::Included),
538 "[a-c]{0,5}".prop_map(Bound::Excluded),
539 ]
540 }
541
542 fn extract_key(bound: Bound<&String>) -> Option<&str> {
543 match bound.as_ref() {
544 Bound::Included(key) => Some(key.as_str()),
545 Bound::Excluded(key) => Some(key.as_str()),
546 Bound::Unbounded => None,
547 }
548 }
549
550 fn bounds_strategy() -> impl Strategy<Value = (Bound<String>, Bound<String>)> {
551 (bound_strategy(), bound_strategy()).prop_filter(
552 "Lower bound <= Upper bound",
553 |(left, right)| match (extract_key(left.as_ref()), extract_key(right.as_ref())) {
554 (None, _) => true,
555 (_, None) => true,
556 (left, right) => left < right,
557 },
558 )
559 }
560
561 proptest! {
562 #[test]
563 fn test_proptest_sstable_ranges(words in prop::collection::btree_set("[a-c]{0,6}", 1..100),
564 (lower_bound, upper_bound) in bounds_strategy(),
565 ) {
566 let mut builder = Dictionary::<VoidSSTable>::builder(Vec::new()).unwrap();
567 builder.set_block_len(16);
568 for word in &words {
569 builder.insert(word.as_bytes(), &()).unwrap();
570 }
571 let buffer: Vec<u8> = builder.finish().unwrap();
572 let dictionary: Dictionary<VoidSSTable> = Dictionary::open(FileSlice::from(buffer)).unwrap();
573 let mut range_builder = dictionary.range();
574 range_builder = match lower_bound.as_ref() {
575 Bound::Included(key) => range_builder.ge(key.as_bytes()),
576 Bound::Excluded(key) => range_builder.gt(key.as_bytes()),
577 Bound::Unbounded => range_builder,
578 };
579 range_builder = match upper_bound.as_ref() {
580 Bound::Included(key) => range_builder.le(key.as_bytes()),
581 Bound::Excluded(key) => range_builder.lt(key.as_bytes()),
582 Bound::Unbounded => range_builder,
583 };
584 let mut stream = range_builder.into_stream().unwrap();
585 let mut btree_set_range = words.range((lower_bound, upper_bound));
586 while stream.advance() {
587 let val = btree_set_range.next().unwrap();
588 assert_eq!(val.as_bytes(), stream.key());
589 }
590 assert!(btree_set_range.next().is_none());
591 }
592 }
593}