summavy_sstable/
sstable_index.rs

1use std::io;
2use std::ops::Range;
3
4use serde::{Deserialize, Serialize};
5
6use crate::{common_prefix_len, SSTableDataCorruption};
7
8#[derive(Default, Debug, Serialize, Deserialize)]
9pub struct SSTableIndex {
10    blocks: Vec<BlockMeta>,
11}
12
13impl SSTableIndex {
14    pub fn load(data: &[u8]) -> Result<SSTableIndex, SSTableDataCorruption> {
15        ciborium::de::from_reader(data).map_err(|_| SSTableDataCorruption)
16    }
17
18    pub fn search_block(&self, key: &[u8]) -> Option<BlockAddr> {
19        self.blocks
20            .iter()
21            .find(|block| &block.last_key_or_greater[..] >= key)
22            .map(|block| block.block_addr.clone())
23    }
24}
25
26#[derive(Clone, Eq, PartialEq, Debug, Serialize, Deserialize)]
27pub struct BlockAddr {
28    pub byte_range: Range<usize>,
29    pub first_ordinal: u64,
30}
31
32#[derive(Debug, Serialize, Deserialize)]
33struct BlockMeta {
34    /// Any byte string that is lexicographically greater or equal to
35    /// the last key in the block,
36    /// and yet strictly smaller than the first key in the next block.
37    pub last_key_or_greater: Vec<u8>,
38    pub block_addr: BlockAddr,
39}
40
41#[derive(Default)]
42pub struct SSTableIndexBuilder {
43    index: SSTableIndex,
44}
45
46/// Given that left < right,
47/// mutates `left into a shorter byte string left'` that
48/// matches `left <= left' < right`.
49fn find_shorter_str_in_between(left: &mut Vec<u8>, right: &[u8]) {
50    assert!(&left[..] < right);
51    let common_len = common_prefix_len(left, right);
52    if left.len() == common_len {
53        return;
54    }
55    // It is possible to do one character shorter in some case,
56    // but it is not worth the extra complexity
57    for pos in (common_len + 1)..left.len() {
58        if left[pos] != u8::MAX {
59            left[pos] += 1;
60            left.truncate(pos + 1);
61            return;
62        }
63    }
64}
65
66impl SSTableIndexBuilder {
67    /// In order to make the index as light as possible, we
68    /// try to find a shorter alternative to the last key of the last block
69    /// that is still smaller than the next key.
70    pub(crate) fn shorten_last_block_key_given_next_key(&mut self, next_key: &[u8]) {
71        if let Some(last_block) = self.index.blocks.last_mut() {
72            find_shorter_str_in_between(&mut last_block.last_key_or_greater, next_key);
73        }
74    }
75
76    pub fn add_block(&mut self, last_key: &[u8], byte_range: Range<usize>, first_ordinal: u64) {
77        self.index.blocks.push(BlockMeta {
78            last_key_or_greater: last_key.to_vec(),
79            block_addr: BlockAddr {
80                byte_range,
81                first_ordinal,
82            },
83        })
84    }
85
86    pub fn serialize<W: std::io::Write>(&self, wrt: W) -> io::Result<()> {
87        ciborium::ser::into_writer(&self.index, wrt)
88            .map_err(|err| io::Error::new(io::ErrorKind::Other, err))
89    }
90}
91
92#[cfg(test)]
93mod tests {
94    use super::{BlockAddr, SSTableIndex, SSTableIndexBuilder};
95    use crate::SSTableDataCorruption;
96
97    #[test]
98    fn test_sstable_index() {
99        let mut sstable_builder = SSTableIndexBuilder::default();
100        sstable_builder.add_block(b"aaa", 10..20, 0u64);
101        sstable_builder.add_block(b"bbbbbbb", 20..30, 564);
102        sstable_builder.add_block(b"ccc", 30..40, 10u64);
103        sstable_builder.add_block(b"dddd", 40..50, 15u64);
104        let mut buffer: Vec<u8> = Vec::new();
105        sstable_builder.serialize(&mut buffer).unwrap();
106        let sstable_index = SSTableIndex::load(&buffer[..]).unwrap();
107        assert_eq!(
108            sstable_index.search_block(b"bbbde"),
109            Some(BlockAddr {
110                first_ordinal: 10u64,
111                byte_range: 30..40
112            })
113        );
114    }
115
116    #[test]
117    fn test_sstable_with_corrupted_data() {
118        let mut sstable_builder = SSTableIndexBuilder::default();
119        sstable_builder.add_block(b"aaa", 10..20, 0u64);
120        sstable_builder.add_block(b"bbbbbbb", 20..30, 564);
121        sstable_builder.add_block(b"ccc", 30..40, 10u64);
122        sstable_builder.add_block(b"dddd", 40..50, 15u64);
123        let mut buffer: Vec<u8> = Vec::new();
124        sstable_builder.serialize(&mut buffer).unwrap();
125        buffer[1] = 9u8;
126        let data_corruption_err = SSTableIndex::load(&buffer[..]).err().unwrap();
127        assert!(matches!(data_corruption_err, SSTableDataCorruption));
128    }
129
130    #[track_caller]
131    fn test_find_shorter_str_in_between_aux(left: &[u8], right: &[u8]) {
132        let mut left_buf = left.to_vec();
133        super::find_shorter_str_in_between(&mut left_buf, right);
134        assert!(left_buf.len() <= left.len());
135        assert!(left <= &left_buf);
136        assert!(&left_buf[..] < right);
137    }
138
139    #[test]
140    fn test_find_shorter_str_in_between() {
141        test_find_shorter_str_in_between_aux(b"", b"hello");
142        test_find_shorter_str_in_between_aux(b"abc", b"abcd");
143        test_find_shorter_str_in_between_aux(b"abcd", b"abd");
144        test_find_shorter_str_in_between_aux(&[0, 0, 0], &[1]);
145        test_find_shorter_str_in_between_aux(&[0, 0, 0], &[0, 0, 1]);
146        test_find_shorter_str_in_between_aux(&[0, 0, 255, 255, 255, 0u8], &[0, 1]);
147    }
148
149    use proptest::prelude::*;
150
151    proptest! {
152        #![proptest_config(ProptestConfig::with_cases(100))]
153        #[test]
154        fn test_proptest_find_shorter_str(left in any::<Vec<u8>>(), right in any::<Vec<u8>>()) {
155            if left < right {
156                test_find_shorter_str_in_between_aux(&left, &right);
157            }
158        }
159    }
160}