mini_lsm_mvcc/
table.rs

1pub(crate) mod bloom;
2mod builder;
3mod iterator;
4
5use std::fs::File;
6use std::path::Path;
7use std::sync::Arc;
8
9use anyhow::{anyhow, bail, Result};
10pub use builder::SsTableBuilder;
11use bytes::{Buf, BufMut};
12pub use iterator::SsTableIterator;
13
14use crate::block::Block;
15use crate::key::{KeyBytes, KeySlice};
16use crate::lsm_storage::BlockCache;
17
18use self::bloom::Bloom;
19
20#[derive(Clone, Debug, PartialEq, Eq)]
21pub struct BlockMeta {
22    /// Offset of this data block.
23    pub offset: usize,
24    /// The first key of the data block.
25    pub first_key: KeyBytes,
26    /// The last key of the data block.
27    pub last_key: KeyBytes,
28}
29
30impl BlockMeta {
31    /// Encode block meta to a buffer.
32    pub fn encode_block_meta(block_meta: &[BlockMeta], max_ts: u64, buf: &mut Vec<u8>) {
33        let mut estimated_size = std::mem::size_of::<u32>(); // number of blocks
34        for meta in block_meta {
35            // The size of offset
36            estimated_size += std::mem::size_of::<u32>();
37            // The size of key length
38            estimated_size += std::mem::size_of::<u16>();
39            // The size of actual key
40            estimated_size += meta.first_key.raw_len();
41            // The size of key length
42            estimated_size += std::mem::size_of::<u16>();
43            // The size of actual key
44            estimated_size += meta.last_key.raw_len();
45        }
46        estimated_size += std::mem::size_of::<u64>(); // max timestamp
47        estimated_size += std::mem::size_of::<u32>(); // checksum
48
49        // Reserve the space to improve performance, especially when the size of incoming data is
50        // large
51        buf.reserve(estimated_size);
52        let original_len = buf.len();
53        buf.put_u32(block_meta.len() as u32);
54        for meta in block_meta {
55            buf.put_u32(meta.offset as u32);
56            buf.put_u16(meta.first_key.key_len() as u16);
57            buf.put_slice(meta.first_key.key_ref());
58            buf.put_u64(meta.first_key.ts());
59            buf.put_u16(meta.last_key.key_len() as u16);
60            buf.put_slice(meta.last_key.key_ref());
61            buf.put_u64(meta.last_key.ts());
62        }
63        buf.put_u64(max_ts);
64        buf.put_u32(crc32fast::hash(&buf[original_len + 4..]));
65        assert_eq!(estimated_size, buf.len() - original_len);
66    }
67
68    /// Decode block meta from a buffer.
69    pub fn decode_block_meta(mut buf: &[u8]) -> Result<(Vec<BlockMeta>, u64)> {
70        let mut block_meta = Vec::new();
71        let num = buf.get_u32() as usize;
72        let checksum = crc32fast::hash(&buf[..buf.remaining() - 4]);
73        for _ in 0..num {
74            let offset = buf.get_u32() as usize;
75            let first_key_len = buf.get_u16() as usize;
76            let first_key =
77                KeyBytes::from_bytes_with_ts(buf.copy_to_bytes(first_key_len), buf.get_u64());
78            let last_key_len: usize = buf.get_u16() as usize;
79            let last_key =
80                KeyBytes::from_bytes_with_ts(buf.copy_to_bytes(last_key_len), buf.get_u64());
81            block_meta.push(BlockMeta {
82                offset,
83                first_key,
84                last_key,
85            });
86        }
87        let max_ts = buf.get_u64();
88        if buf.get_u32() != checksum {
89            bail!("meta checksum mismatched");
90        }
91
92        Ok((block_meta, max_ts))
93    }
94}
95
96/// A file object.
97pub struct FileObject(Option<File>, u64);
98
99impl FileObject {
100    pub fn read(&self, offset: u64, len: u64) -> Result<Vec<u8>> {
101        use std::os::unix::fs::FileExt;
102        let mut data = vec![0; len as usize];
103        self.0
104            .as_ref()
105            .unwrap()
106            .read_exact_at(&mut data[..], offset)?;
107        Ok(data)
108    }
109
110    pub fn size(&self) -> u64 {
111        self.1
112    }
113
114    /// Create a new file object (day 2) and write the file to the disk (day 4).
115    pub fn create(path: &Path, data: Vec<u8>) -> Result<Self> {
116        std::fs::write(path, &data)?;
117        File::open(path)?.sync_all()?;
118        Ok(FileObject(
119            Some(File::options().read(true).write(false).open(path)?),
120            data.len() as u64,
121        ))
122    }
123
124    pub fn open(path: &Path) -> Result<Self> {
125        let file = File::options().read(true).write(false).open(path)?;
126        let size = file.metadata()?.len();
127        Ok(FileObject(Some(file), size))
128    }
129}
130
131/// An SSTable.
132pub struct SsTable {
133    /// The actual storage unit of SsTable, the format is as above.
134    pub(crate) file: FileObject,
135    /// The meta blocks that hold info for data blocks.
136    pub(crate) block_meta: Vec<BlockMeta>,
137    /// The offset that indicates the start point of meta blocks in `file`.
138    pub(crate) block_meta_offset: usize,
139    id: usize,
140    block_cache: Option<Arc<BlockCache>>,
141    first_key: KeyBytes,
142    last_key: KeyBytes,
143    pub(crate) bloom: Option<Bloom>,
144    max_ts: u64,
145}
146impl SsTable {
147    #[cfg(test)]
148    pub(crate) fn open_for_test(file: FileObject) -> Result<Self> {
149        Self::open(0, None, file)
150    }
151
152    /// Open SSTable from a file.
153    pub fn open(id: usize, block_cache: Option<Arc<BlockCache>>, file: FileObject) -> Result<Self> {
154        let len = file.size();
155        let raw_bloom_offset = file.read(len - 4, 4)?;
156        let bloom_offset = (&raw_bloom_offset[..]).get_u32() as u64;
157        let raw_bloom = file.read(bloom_offset, len - 4 - bloom_offset)?;
158        let bloom_filter = Bloom::decode(&raw_bloom)?;
159        let raw_meta_offset = file.read(bloom_offset - 4, 4)?;
160        let block_meta_offset = (&raw_meta_offset[..]).get_u32() as u64;
161        let raw_meta = file.read(block_meta_offset, bloom_offset - 4 - block_meta_offset)?;
162        let (block_meta, max_ts) = BlockMeta::decode_block_meta(&raw_meta[..])?;
163        Ok(Self {
164            file,
165            first_key: block_meta.first().unwrap().first_key.clone(),
166            last_key: block_meta.last().unwrap().last_key.clone(),
167            block_meta,
168            block_meta_offset: block_meta_offset as usize,
169            id,
170            block_cache,
171            bloom: Some(bloom_filter),
172            max_ts,
173        })
174    }
175
176    /// Create a mock SST with only first key + last key metadata
177    pub fn create_meta_only(
178        id: usize,
179        file_size: u64,
180        first_key: KeyBytes,
181        last_key: KeyBytes,
182    ) -> Self {
183        Self {
184            file: FileObject(None, file_size),
185            block_meta: vec![],
186            block_meta_offset: 0,
187            id,
188            block_cache: None,
189            first_key,
190            last_key,
191            bloom: None,
192            max_ts: 0,
193        }
194    }
195
196    /// Read a block from the disk.
197    pub fn read_block(&self, block_idx: usize) -> Result<Arc<Block>> {
198        let offset = self.block_meta[block_idx].offset;
199        let offset_end = self
200            .block_meta
201            .get(block_idx + 1)
202            .map_or(self.block_meta_offset, |x| x.offset);
203        let block_len = offset_end - offset - 4;
204        let block_data_with_chksum: Vec<u8> = self
205            .file
206            .read(offset as u64, (offset_end - offset) as u64)?;
207        let block_data = &block_data_with_chksum[..block_len];
208        let checksum = (&block_data_with_chksum[block_len..]).get_u32();
209        if checksum != crc32fast::hash(block_data) {
210            bail!("block checksum mismatched");
211        }
212        Ok(Arc::new(Block::decode(block_data)))
213    }
214
215    /// Read a block from disk, with block cache.
216    pub fn read_block_cached(&self, block_idx: usize) -> Result<Arc<Block>> {
217        if let Some(ref block_cache) = self.block_cache {
218            let blk = block_cache
219                .try_get_with((self.id, block_idx), || self.read_block(block_idx))
220                .map_err(|e| anyhow!("{}", e))?;
221            Ok(blk)
222        } else {
223            self.read_block(block_idx)
224        }
225    }
226
227    /// Find the block that may contain `key`.
228    pub fn find_block_idx(&self, key: KeySlice) -> usize {
229        self.block_meta
230            .partition_point(|meta| meta.first_key.as_key_slice() <= key)
231            .saturating_sub(1)
232    }
233
234    /// Get number of data blocks.
235    pub fn num_of_blocks(&self) -> usize {
236        self.block_meta.len()
237    }
238
239    pub fn first_key(&self) -> &KeyBytes {
240        &self.first_key
241    }
242
243    pub fn last_key(&self) -> &KeyBytes {
244        &self.last_key
245    }
246
247    pub fn table_size(&self) -> u64 {
248        self.file.1
249    }
250
251    pub fn sst_id(&self) -> usize {
252        self.id
253    }
254
255    pub fn max_ts(&self) -> u64 {
256        self.max_ts
257    }
258}