1pub(crate) mod bloom;
2mod builder;
3mod iterator;
4
5use std::fs::File;
6use std::path::Path;
7use std::sync::Arc;
8
9use anyhow::{anyhow, bail, Result};
10pub use builder::SsTableBuilder;
11use bytes::{Buf, BufMut};
12pub use iterator::SsTableIterator;
13
14use crate::block::Block;
15use crate::key::{KeyBytes, KeySlice};
16use crate::lsm_storage::BlockCache;
17
18use self::bloom::Bloom;
19
20#[derive(Clone, Debug, PartialEq, Eq)]
21pub struct BlockMeta {
22 pub offset: usize,
24 pub first_key: KeyBytes,
26 pub last_key: KeyBytes,
28}
29
30impl BlockMeta {
31 pub fn encode_block_meta(block_meta: &[BlockMeta], buf: &mut Vec<u8>) {
33 let mut estimated_size = std::mem::size_of::<u32>();
34 for meta in block_meta {
35 estimated_size += std::mem::size_of::<u32>();
37 estimated_size += std::mem::size_of::<u16>();
39 estimated_size += meta.first_key.len();
41 estimated_size += std::mem::size_of::<u16>();
43 estimated_size += meta.last_key.len();
45 }
46 estimated_size += std::mem::size_of::<u32>();
47 buf.reserve(estimated_size);
50 let original_len = buf.len();
51 buf.put_u32(block_meta.len() as u32);
52 for meta in block_meta {
53 buf.put_u32(meta.offset as u32);
54 buf.put_u16(meta.first_key.len() as u16);
55 buf.put_slice(meta.first_key.raw_ref());
56 buf.put_u16(meta.last_key.len() as u16);
57 buf.put_slice(meta.last_key.raw_ref());
58 }
59 buf.put_u32(crc32fast::hash(&buf[original_len + 4..]));
60 assert_eq!(estimated_size, buf.len() - original_len);
61 }
62
63 pub fn decode_block_meta(mut buf: &[u8]) -> Result<Vec<BlockMeta>> {
65 let mut block_meta = Vec::new();
66 let num = buf.get_u32() as usize;
67 let checksum = crc32fast::hash(&buf[..buf.remaining() - 4]);
68 for _ in 0..num {
69 let offset = buf.get_u32() as usize;
70 let first_key_len = buf.get_u16() as usize;
71 let first_key = KeyBytes::from_bytes(buf.copy_to_bytes(first_key_len));
72 let last_key_len: usize = buf.get_u16() as usize;
73 let last_key = KeyBytes::from_bytes(buf.copy_to_bytes(last_key_len));
74 block_meta.push(BlockMeta {
75 offset,
76 first_key,
77 last_key,
78 });
79 }
80 if buf.get_u32() != checksum {
81 bail!("meta checksum mismatched");
82 }
83
84 Ok(block_meta)
85 }
86}
87
88pub struct FileObject(Option<File>, u64);
90
91impl FileObject {
92 pub fn read(&self, offset: u64, len: u64) -> Result<Vec<u8>> {
93 use std::os::unix::fs::FileExt;
94 let mut data = vec![0; len as usize];
95 self.0
96 .as_ref()
97 .unwrap()
98 .read_exact_at(&mut data[..], offset)?;
99 Ok(data)
100 }
101
102 pub fn size(&self) -> u64 {
103 self.1
104 }
105
106 pub fn create(path: &Path, data: Vec<u8>) -> Result<Self> {
108 std::fs::write(path, &data)?;
109 File::open(path)?.sync_all()?;
110 Ok(FileObject(
111 Some(File::options().read(true).write(false).open(path)?),
112 data.len() as u64,
113 ))
114 }
115
116 pub fn open(path: &Path) -> Result<Self> {
117 let file = File::options().read(true).write(false).open(path)?;
118 let size = file.metadata()?.len();
119 Ok(FileObject(Some(file), size))
120 }
121}
122
123pub struct SsTable {
125 pub(crate) file: FileObject,
127 pub(crate) block_meta: Vec<BlockMeta>,
129 pub(crate) block_meta_offset: usize,
131 id: usize,
132 block_cache: Option<Arc<BlockCache>>,
133 first_key: KeyBytes,
134 last_key: KeyBytes,
135 pub(crate) bloom: Option<Bloom>,
136 max_ts: u64,
137}
138impl SsTable {
139 #[cfg(test)]
140 pub(crate) fn open_for_test(file: FileObject) -> Result<Self> {
141 Self::open(0, None, file)
142 }
143
144 pub fn open(id: usize, block_cache: Option<Arc<BlockCache>>, file: FileObject) -> Result<Self> {
146 let len = file.size();
147 let raw_bloom_offset = file.read(len - 4, 4)?;
148 let bloom_offset = (&raw_bloom_offset[..]).get_u32() as u64;
149 let raw_bloom = file.read(bloom_offset, len - 4 - bloom_offset)?;
150 let bloom_filter = Bloom::decode(&raw_bloom)?;
151 let raw_meta_offset = file.read(bloom_offset - 4, 4)?;
152 let block_meta_offset = (&raw_meta_offset[..]).get_u32() as u64;
153 let raw_meta = file.read(block_meta_offset, bloom_offset - 4 - block_meta_offset)?;
154 let block_meta = BlockMeta::decode_block_meta(&raw_meta[..])?;
155 Ok(Self {
156 file,
157 first_key: block_meta.first().unwrap().first_key.clone(),
158 last_key: block_meta.last().unwrap().last_key.clone(),
159 block_meta,
160 block_meta_offset: block_meta_offset as usize,
161 id,
162 block_cache,
163 bloom: Some(bloom_filter),
164 max_ts: 0,
165 })
166 }
167
168 pub fn create_meta_only(
170 id: usize,
171 file_size: u64,
172 first_key: KeyBytes,
173 last_key: KeyBytes,
174 ) -> Self {
175 Self {
176 file: FileObject(None, file_size),
177 block_meta: vec![],
178 block_meta_offset: 0,
179 id,
180 block_cache: None,
181 first_key,
182 last_key,
183 bloom: None,
184 max_ts: 0,
185 }
186 }
187
188 pub fn read_block(&self, block_idx: usize) -> Result<Arc<Block>> {
190 let offset = self.block_meta[block_idx].offset;
191 let offset_end = self
192 .block_meta
193 .get(block_idx + 1)
194 .map_or(self.block_meta_offset, |x| x.offset);
195 let block_len = offset_end - offset - 4;
196 let block_data_with_chksum: Vec<u8> = self
197 .file
198 .read(offset as u64, (offset_end - offset) as u64)?;
199 let block_data = &block_data_with_chksum[..block_len];
200 let checksum = (&block_data_with_chksum[block_len..]).get_u32();
201 if checksum != crc32fast::hash(block_data) {
202 bail!("block checksum mismatched");
203 }
204 Ok(Arc::new(Block::decode(block_data)))
205 }
206
207 pub fn read_block_cached(&self, block_idx: usize) -> Result<Arc<Block>> {
209 if let Some(ref block_cache) = self.block_cache {
210 let blk = block_cache
211 .try_get_with((self.id, block_idx), || self.read_block(block_idx))
212 .map_err(|e| anyhow!("{}", e))?;
213 Ok(blk)
214 } else {
215 self.read_block(block_idx)
216 }
217 }
218
219 pub fn find_block_idx(&self, key: KeySlice) -> usize {
221 self.block_meta
222 .partition_point(|meta| meta.first_key.as_key_slice() <= key)
223 .saturating_sub(1)
224 }
225
226 pub fn num_of_blocks(&self) -> usize {
228 self.block_meta.len()
229 }
230
231 pub fn first_key(&self) -> &KeyBytes {
232 &self.first_key
233 }
234
235 pub fn last_key(&self) -> &KeyBytes {
236 &self.last_key
237 }
238
239 pub fn table_size(&self) -> u64 {
240 self.file.1
241 }
242
243 pub fn sst_id(&self) -> usize {
244 self.id
245 }
246
247 pub fn max_ts(&self) -> u64 {
248 self.max_ts
249 }
250}