mini_lsm/
table.rs

1pub(crate) mod bloom;
2mod builder;
3mod iterator;
4
5use std::fs::File;
6use std::path::Path;
7use std::sync::Arc;
8
9use anyhow::{anyhow, bail, Result};
10pub use builder::SsTableBuilder;
11use bytes::{Buf, BufMut};
12pub use iterator::SsTableIterator;
13
14use crate::block::Block;
15use crate::key::{KeyBytes, KeySlice};
16use crate::lsm_storage::BlockCache;
17
18use self::bloom::Bloom;
19
20#[derive(Clone, Debug, PartialEq, Eq)]
21pub struct BlockMeta {
22    /// Offset of this data block.
23    pub offset: usize,
24    /// The first key of the data block.
25    pub first_key: KeyBytes,
26    /// The last key of the data block.
27    pub last_key: KeyBytes,
28}
29
30impl BlockMeta {
31    /// Encode block meta to a buffer.
32    pub fn encode_block_meta(block_meta: &[BlockMeta], buf: &mut Vec<u8>) {
33        let mut estimated_size = std::mem::size_of::<u32>();
34        for meta in block_meta {
35            // The size of offset
36            estimated_size += std::mem::size_of::<u32>();
37            // The size of key length
38            estimated_size += std::mem::size_of::<u16>();
39            // The size of actual key
40            estimated_size += meta.first_key.len();
41            // The size of key length
42            estimated_size += std::mem::size_of::<u16>();
43            // The size of actual key
44            estimated_size += meta.last_key.len();
45        }
46        estimated_size += std::mem::size_of::<u32>();
47        // Reserve the space to improve performance, especially when the size of incoming data is
48        // large
49        buf.reserve(estimated_size);
50        let original_len = buf.len();
51        buf.put_u32(block_meta.len() as u32);
52        for meta in block_meta {
53            buf.put_u32(meta.offset as u32);
54            buf.put_u16(meta.first_key.len() as u16);
55            buf.put_slice(meta.first_key.raw_ref());
56            buf.put_u16(meta.last_key.len() as u16);
57            buf.put_slice(meta.last_key.raw_ref());
58        }
59        buf.put_u32(crc32fast::hash(&buf[original_len + 4..]));
60        assert_eq!(estimated_size, buf.len() - original_len);
61    }
62
63    /// Decode block meta from a buffer.
64    pub fn decode_block_meta(mut buf: &[u8]) -> Result<Vec<BlockMeta>> {
65        let mut block_meta = Vec::new();
66        let num = buf.get_u32() as usize;
67        let checksum = crc32fast::hash(&buf[..buf.remaining() - 4]);
68        for _ in 0..num {
69            let offset = buf.get_u32() as usize;
70            let first_key_len = buf.get_u16() as usize;
71            let first_key = KeyBytes::from_bytes(buf.copy_to_bytes(first_key_len));
72            let last_key_len: usize = buf.get_u16() as usize;
73            let last_key = KeyBytes::from_bytes(buf.copy_to_bytes(last_key_len));
74            block_meta.push(BlockMeta {
75                offset,
76                first_key,
77                last_key,
78            });
79        }
80        if buf.get_u32() != checksum {
81            bail!("meta checksum mismatched");
82        }
83
84        Ok(block_meta)
85    }
86}
87
88/// A file object.
89pub struct FileObject(Option<File>, u64);
90
91impl FileObject {
92    pub fn read(&self, offset: u64, len: u64) -> Result<Vec<u8>> {
93        use std::os::unix::fs::FileExt;
94        let mut data = vec![0; len as usize];
95        self.0
96            .as_ref()
97            .unwrap()
98            .read_exact_at(&mut data[..], offset)?;
99        Ok(data)
100    }
101
102    pub fn size(&self) -> u64 {
103        self.1
104    }
105
106    /// Create a new file object (day 2) and write the file to the disk (day 4).
107    pub fn create(path: &Path, data: Vec<u8>) -> Result<Self> {
108        std::fs::write(path, &data)?;
109        File::open(path)?.sync_all()?;
110        Ok(FileObject(
111            Some(File::options().read(true).write(false).open(path)?),
112            data.len() as u64,
113        ))
114    }
115
116    pub fn open(path: &Path) -> Result<Self> {
117        let file = File::options().read(true).write(false).open(path)?;
118        let size = file.metadata()?.len();
119        Ok(FileObject(Some(file), size))
120    }
121}
122
123/// An SSTable.
124pub struct SsTable {
125    /// The actual storage unit of SsTable, the format is as above.
126    pub(crate) file: FileObject,
127    /// The meta blocks that hold info for data blocks.
128    pub(crate) block_meta: Vec<BlockMeta>,
129    /// The offset that indicates the start point of meta blocks in `file`.
130    pub(crate) block_meta_offset: usize,
131    id: usize,
132    block_cache: Option<Arc<BlockCache>>,
133    first_key: KeyBytes,
134    last_key: KeyBytes,
135    pub(crate) bloom: Option<Bloom>,
136    max_ts: u64,
137}
138impl SsTable {
139    #[cfg(test)]
140    pub(crate) fn open_for_test(file: FileObject) -> Result<Self> {
141        Self::open(0, None, file)
142    }
143
144    /// Open SSTable from a file.
145    pub fn open(id: usize, block_cache: Option<Arc<BlockCache>>, file: FileObject) -> Result<Self> {
146        let len = file.size();
147        let raw_bloom_offset = file.read(len - 4, 4)?;
148        let bloom_offset = (&raw_bloom_offset[..]).get_u32() as u64;
149        let raw_bloom = file.read(bloom_offset, len - 4 - bloom_offset)?;
150        let bloom_filter = Bloom::decode(&raw_bloom)?;
151        let raw_meta_offset = file.read(bloom_offset - 4, 4)?;
152        let block_meta_offset = (&raw_meta_offset[..]).get_u32() as u64;
153        let raw_meta = file.read(block_meta_offset, bloom_offset - 4 - block_meta_offset)?;
154        let block_meta = BlockMeta::decode_block_meta(&raw_meta[..])?;
155        Ok(Self {
156            file,
157            first_key: block_meta.first().unwrap().first_key.clone(),
158            last_key: block_meta.last().unwrap().last_key.clone(),
159            block_meta,
160            block_meta_offset: block_meta_offset as usize,
161            id,
162            block_cache,
163            bloom: Some(bloom_filter),
164            max_ts: 0,
165        })
166    }
167
168    /// Create a mock SST with only first key + last key metadata
169    pub fn create_meta_only(
170        id: usize,
171        file_size: u64,
172        first_key: KeyBytes,
173        last_key: KeyBytes,
174    ) -> Self {
175        Self {
176            file: FileObject(None, file_size),
177            block_meta: vec![],
178            block_meta_offset: 0,
179            id,
180            block_cache: None,
181            first_key,
182            last_key,
183            bloom: None,
184            max_ts: 0,
185        }
186    }
187
188    /// Read a block from the disk.
189    pub fn read_block(&self, block_idx: usize) -> Result<Arc<Block>> {
190        let offset = self.block_meta[block_idx].offset;
191        let offset_end = self
192            .block_meta
193            .get(block_idx + 1)
194            .map_or(self.block_meta_offset, |x| x.offset);
195        let block_len = offset_end - offset - 4;
196        let block_data_with_chksum: Vec<u8> = self
197            .file
198            .read(offset as u64, (offset_end - offset) as u64)?;
199        let block_data = &block_data_with_chksum[..block_len];
200        let checksum = (&block_data_with_chksum[block_len..]).get_u32();
201        if checksum != crc32fast::hash(block_data) {
202            bail!("block checksum mismatched");
203        }
204        Ok(Arc::new(Block::decode(block_data)))
205    }
206
207    /// Read a block from disk, with block cache.
208    pub fn read_block_cached(&self, block_idx: usize) -> Result<Arc<Block>> {
209        if let Some(ref block_cache) = self.block_cache {
210            let blk = block_cache
211                .try_get_with((self.id, block_idx), || self.read_block(block_idx))
212                .map_err(|e| anyhow!("{}", e))?;
213            Ok(blk)
214        } else {
215            self.read_block(block_idx)
216        }
217    }
218
219    /// Find the block that may contain `key`.
220    pub fn find_block_idx(&self, key: KeySlice) -> usize {
221        self.block_meta
222            .partition_point(|meta| meta.first_key.as_key_slice() <= key)
223            .saturating_sub(1)
224    }
225
226    /// Get number of data blocks.
227    pub fn num_of_blocks(&self) -> usize {
228        self.block_meta.len()
229    }
230
231    pub fn first_key(&self) -> &KeyBytes {
232        &self.first_key
233    }
234
235    pub fn last_key(&self) -> &KeyBytes {
236        &self.last_key
237    }
238
239    pub fn table_size(&self) -> u64 {
240        self.file.1
241    }
242
243    pub fn sst_id(&self) -> usize {
244        self.id
245    }
246
247    pub fn max_ts(&self) -> u64 {
248        self.max_ts
249    }
250}