summavy_sstable/
sstable_index.rs1use std::io;
2use std::ops::Range;
3
4use serde::{Deserialize, Serialize};
5
6use crate::{common_prefix_len, SSTableDataCorruption};
7
8#[derive(Default, Debug, Serialize, Deserialize)]
9pub struct SSTableIndex {
10 blocks: Vec<BlockMeta>,
11}
12
13impl SSTableIndex {
14 pub fn load(data: &[u8]) -> Result<SSTableIndex, SSTableDataCorruption> {
15 ciborium::de::from_reader(data).map_err(|_| SSTableDataCorruption)
16 }
17
18 pub fn search_block(&self, key: &[u8]) -> Option<BlockAddr> {
19 self.blocks
20 .iter()
21 .find(|block| &block.last_key_or_greater[..] >= key)
22 .map(|block| block.block_addr.clone())
23 }
24}
25
26#[derive(Clone, Eq, PartialEq, Debug, Serialize, Deserialize)]
27pub struct BlockAddr {
28 pub byte_range: Range<usize>,
29 pub first_ordinal: u64,
30}
31
32#[derive(Debug, Serialize, Deserialize)]
33struct BlockMeta {
34 pub last_key_or_greater: Vec<u8>,
38 pub block_addr: BlockAddr,
39}
40
41#[derive(Default)]
42pub struct SSTableIndexBuilder {
43 index: SSTableIndex,
44}
45
46fn find_shorter_str_in_between(left: &mut Vec<u8>, right: &[u8]) {
50 assert!(&left[..] < right);
51 let common_len = common_prefix_len(left, right);
52 if left.len() == common_len {
53 return;
54 }
55 for pos in (common_len + 1)..left.len() {
58 if left[pos] != u8::MAX {
59 left[pos] += 1;
60 left.truncate(pos + 1);
61 return;
62 }
63 }
64}
65
66impl SSTableIndexBuilder {
67 pub(crate) fn shorten_last_block_key_given_next_key(&mut self, next_key: &[u8]) {
71 if let Some(last_block) = self.index.blocks.last_mut() {
72 find_shorter_str_in_between(&mut last_block.last_key_or_greater, next_key);
73 }
74 }
75
76 pub fn add_block(&mut self, last_key: &[u8], byte_range: Range<usize>, first_ordinal: u64) {
77 self.index.blocks.push(BlockMeta {
78 last_key_or_greater: last_key.to_vec(),
79 block_addr: BlockAddr {
80 byte_range,
81 first_ordinal,
82 },
83 })
84 }
85
86 pub fn serialize<W: std::io::Write>(&self, wrt: W) -> io::Result<()> {
87 ciborium::ser::into_writer(&self.index, wrt)
88 .map_err(|err| io::Error::new(io::ErrorKind::Other, err))
89 }
90}
91
92#[cfg(test)]
93mod tests {
94 use super::{BlockAddr, SSTableIndex, SSTableIndexBuilder};
95 use crate::SSTableDataCorruption;
96
97 #[test]
98 fn test_sstable_index() {
99 let mut sstable_builder = SSTableIndexBuilder::default();
100 sstable_builder.add_block(b"aaa", 10..20, 0u64);
101 sstable_builder.add_block(b"bbbbbbb", 20..30, 564);
102 sstable_builder.add_block(b"ccc", 30..40, 10u64);
103 sstable_builder.add_block(b"dddd", 40..50, 15u64);
104 let mut buffer: Vec<u8> = Vec::new();
105 sstable_builder.serialize(&mut buffer).unwrap();
106 let sstable_index = SSTableIndex::load(&buffer[..]).unwrap();
107 assert_eq!(
108 sstable_index.search_block(b"bbbde"),
109 Some(BlockAddr {
110 first_ordinal: 10u64,
111 byte_range: 30..40
112 })
113 );
114 }
115
116 #[test]
117 fn test_sstable_with_corrupted_data() {
118 let mut sstable_builder = SSTableIndexBuilder::default();
119 sstable_builder.add_block(b"aaa", 10..20, 0u64);
120 sstable_builder.add_block(b"bbbbbbb", 20..30, 564);
121 sstable_builder.add_block(b"ccc", 30..40, 10u64);
122 sstable_builder.add_block(b"dddd", 40..50, 15u64);
123 let mut buffer: Vec<u8> = Vec::new();
124 sstable_builder.serialize(&mut buffer).unwrap();
125 buffer[1] = 9u8;
126 let data_corruption_err = SSTableIndex::load(&buffer[..]).err().unwrap();
127 assert!(matches!(data_corruption_err, SSTableDataCorruption));
128 }
129
130 #[track_caller]
131 fn test_find_shorter_str_in_between_aux(left: &[u8], right: &[u8]) {
132 let mut left_buf = left.to_vec();
133 super::find_shorter_str_in_between(&mut left_buf, right);
134 assert!(left_buf.len() <= left.len());
135 assert!(left <= &left_buf);
136 assert!(&left_buf[..] < right);
137 }
138
139 #[test]
140 fn test_find_shorter_str_in_between() {
141 test_find_shorter_str_in_between_aux(b"", b"hello");
142 test_find_shorter_str_in_between_aux(b"abc", b"abcd");
143 test_find_shorter_str_in_between_aux(b"abcd", b"abd");
144 test_find_shorter_str_in_between_aux(&[0, 0, 0], &[1]);
145 test_find_shorter_str_in_between_aux(&[0, 0, 0], &[0, 0, 1]);
146 test_find_shorter_str_in_between_aux(&[0, 0, 255, 255, 255, 0u8], &[0, 1]);
147 }
148
149 use proptest::prelude::*;
150
151 proptest! {
152 #![proptest_config(ProptestConfig::with_cases(100))]
153 #[test]
154 fn test_proptest_find_shorter_str(left in any::<Vec<u8>>(), right in any::<Vec<u8>>()) {
155 if left < right {
156 test_find_shorter_str_in_between_aux(&left, &right);
157 }
158 }
159 }
160}