bitcasky_common/formatter/
mod.rs

1use std::{
2    fs::File,
3    io::{self, Read, Write},
4    ops::Deref,
5};
6
7use crate::storage_id::StorageId;
8
9use bytes::{BufMut, Bytes, BytesMut};
10use thiserror::Error;
11
12mod formatter_v1;
13pub use self::formatter_v1::FormatterV1;
14
15const MAGIC: &[u8; 3] = b"btk";
16const FORMATTER_V1_VERSION: u8 = 1;
17pub const FILE_HEADER_SIZE: usize = 8;
18
19#[derive(Debug, PartialEq, Eq)]
20pub struct RowMeta {
21    pub expire_timestamp: u64,
22    pub key_size: usize,
23    pub value_size: usize,
24}
25
26#[derive(Debug, PartialEq, Eq)]
27pub struct RowHeader {
28    pub crc: u32,
29    pub meta: RowMeta,
30}
31
32#[derive(Debug)]
33pub struct RowToWrite<K: AsRef<[u8]>, V: Deref<Target = [u8]>> {
34    pub meta: RowMeta,
35    pub key: K,
36    pub value: V,
37}
38
39#[derive(PartialEq, Eq, Debug, Clone, Copy)]
40pub struct MergeMeta {
41    pub known_max_storage_id: StorageId,
42}
43
44#[derive(Debug, PartialEq, Eq, Clone)]
45pub struct RowHintHeader {
46    pub expire_timestamp: u64,
47    pub key_size: usize,
48    pub row_offset: usize,
49}
50
51#[derive(Debug, PartialEq, Eq, Clone)]
52pub struct RowHint {
53    pub header: RowHintHeader,
54    pub key: Vec<u8>,
55}
56
57impl<K: AsRef<[u8]>, V: Deref<Target = [u8]>> RowToWrite<K, V> {
58    pub fn new(key: K, value: V) -> RowToWrite<K, V> {
59        RowToWrite::new_with_timestamp(key, value, 0)
60    }
61
62    pub fn new_with_timestamp(key: K, value: V, expire_timestamp: u64) -> RowToWrite<K, V> {
63        let key_size = key.as_ref().len();
64        let value_size = value.len();
65        RowToWrite {
66            meta: RowMeta {
67                expire_timestamp,
68                key_size,
69                value_size,
70            },
71            key,
72            value,
73        }
74    }
75}
76
77#[derive(Error, Debug)]
78#[error("{}")]
79pub enum FormatterError {
80    #[error("Crc check failed. expect crc is: {expected_crc}, actual crc is: {actual_crc}")]
81    CrcCheckFailed { expected_crc: u32, actual_crc: u32 },
82    #[error("Got IO Error: {0}")]
83    IoError(#[from] std::io::Error),
84    #[error("Read file header failed: {1}")]
85    ReadFileHeaderFailed(#[source] io::Error, String),
86    #[error("Magic string does not match")]
87    MagicNotMatch(),
88    #[error("Unknown formatter version: {0}")]
89    UnknownFormatterVersion(u8),
90}
91
92pub type Result<T> = std::result::Result<T, FormatterError>;
93
94pub trait Formatter: std::marker::Send + 'static + Copy {
95    fn row_header_size(&self) -> usize;
96
97    fn net_row_size<K: AsRef<[u8]>, V: Deref<Target = [u8]>>(
98        &self,
99        row: &RowToWrite<K, V>,
100    ) -> usize;
101
102    fn encode_row<K: AsRef<[u8]>, V: Deref<Target = [u8]>>(
103        &self,
104        row: &RowToWrite<K, V>,
105        output: &mut [u8],
106    ) -> usize;
107
108    fn decode_row_header(&self, bs: &[u8]) -> RowHeader;
109
110    fn validate_key_value(&self, header: &RowHeader, kv: &[u8]) -> Result<()>;
111
112    fn encode_row_hint(&self, hint: &RowHint, output: &mut [u8]) -> usize;
113
114    fn row_hint_header_size(&self) -> usize;
115
116    fn decode_row_hint_header(&self, header_bs: &[u8]) -> RowHintHeader;
117
118    fn merge_meta_size(&self) -> usize;
119
120    fn encode_merge_meta(&self, meta: &MergeMeta) -> Bytes;
121
122    fn decode_merge_meta(&self, meta: Bytes) -> MergeMeta;
123}
124
125#[derive(Clone, Copy, Debug, PartialEq)]
126pub enum BitcaskyFormatter {
127    V1(FormatterV1),
128}
129
130impl BitcaskyFormatter {
131    pub fn version(&self) -> u8 {
132        match self {
133            BitcaskyFormatter::V1(_) => FORMATTER_V1_VERSION,
134        }
135    }
136}
137
138impl Formatter for BitcaskyFormatter {
139    fn row_header_size(&self) -> usize {
140        match self {
141            BitcaskyFormatter::V1(f) => f.row_header_size(),
142        }
143    }
144
145    fn net_row_size<K: AsRef<[u8]>, V: Deref<Target = [u8]>>(
146        &self,
147        row: &RowToWrite<K, V>,
148    ) -> usize {
149        match self {
150            BitcaskyFormatter::V1(f) => f.net_row_size(row),
151        }
152    }
153
154    fn encode_row<K: AsRef<[u8]>, V: Deref<Target = [u8]>>(
155        &self,
156        row: &RowToWrite<K, V>,
157        output: &mut [u8],
158    ) -> usize {
159        match self {
160            BitcaskyFormatter::V1(f) => f.encode_row(row, output),
161        }
162    }
163
164    fn decode_row_header(&self, bs: &[u8]) -> RowHeader {
165        match self {
166            BitcaskyFormatter::V1(f) => f.decode_row_header(bs),
167        }
168    }
169
170    fn validate_key_value(&self, header: &RowHeader, kv: &[u8]) -> Result<()> {
171        match self {
172            BitcaskyFormatter::V1(f) => f.validate_key_value(header, kv),
173        }
174    }
175
176    fn row_hint_header_size(&self) -> usize {
177        match self {
178            BitcaskyFormatter::V1(f) => f.row_hint_header_size(),
179        }
180    }
181
182    fn encode_row_hint(&self, hint: &RowHint, output: &mut [u8]) -> usize {
183        match self {
184            BitcaskyFormatter::V1(f) => f.encode_row_hint(hint, output),
185        }
186    }
187
188    fn decode_row_hint_header(&self, header_bs: &[u8]) -> RowHintHeader {
189        match self {
190            BitcaskyFormatter::V1(f) => f.decode_row_hint_header(header_bs),
191        }
192    }
193
194    fn merge_meta_size(&self) -> usize {
195        match self {
196            BitcaskyFormatter::V1(f) => f.merge_meta_size(),
197        }
198    }
199
200    fn encode_merge_meta(&self, meta: &MergeMeta) -> Bytes {
201        match self {
202            BitcaskyFormatter::V1(f) => f.encode_merge_meta(meta),
203        }
204    }
205
206    fn decode_merge_meta(&self, meta: Bytes) -> MergeMeta {
207        match self {
208            BitcaskyFormatter::V1(f) => f.decode_merge_meta(meta),
209        }
210    }
211}
212
213impl Default for BitcaskyFormatter {
214    fn default() -> Self {
215        BitcaskyFormatter::V1(FormatterV1::default())
216    }
217}
218
219pub fn initialize_new_file(file: &mut File, version: u8) -> std::io::Result<()> {
220    let mut bs = BytesMut::with_capacity(FILE_HEADER_SIZE);
221
222    bs.extend_from_slice(MAGIC);
223    bs.put_u8(version);
224    bs.put_u32(0);
225
226    file.write_all(&bs.freeze())?;
227    file.flush()?;
228    Ok(())
229}
230
231pub fn get_formatter_from_file(file: &mut File) -> Result<BitcaskyFormatter> {
232    let mut file_header = vec![0; FILE_HEADER_SIZE];
233
234    file.read_exact(&mut file_header)
235        .map_err(|e| FormatterError::ReadFileHeaderFailed(e, "read file header failed".into()))?;
236
237    if MAGIC != &file_header[0..3] {
238        return Err(FormatterError::MagicNotMatch());
239    }
240
241    let formatter_version = file_header[3];
242    if formatter_version == FORMATTER_V1_VERSION {
243        return Ok(BitcaskyFormatter::V1(FormatterV1::default()));
244    }
245
246    Err(FormatterError::UnknownFormatterVersion(formatter_version))
247}
248
249// Returns the number of padding bytes to add to a buffer to ensure 4-byte alignment.
250pub fn padding(len: usize) -> usize {
251    4usize.wrapping_sub(len) & 7
252}
253
254#[cfg(test)]
255mod tests {
256    use crate::fs::{create_file, open_file, FileType};
257
258    use super::*;
259
260    use test_log::test;
261    use utilities::common::get_temporary_directory_path;
262
263    #[test]
264    fn test_formatter_v1_file() {
265        let dir = get_temporary_directory_path();
266        let storage_id = 1;
267        let mut file = create_file(&dir, FileType::DataFile, Some(storage_id)).unwrap();
268        let init_formatter = BitcaskyFormatter::V1(FormatterV1::default());
269        initialize_new_file(&mut file, init_formatter.version()).unwrap();
270
271        let mut file = open_file(&dir, FileType::DataFile, Some(storage_id))
272            .unwrap()
273            .file;
274
275        let read_formatter = get_formatter_from_file(&mut file).unwrap();
276        assert_matches!(read_formatter, BitcaskyFormatter::V1(_));
277        assert_eq!(init_formatter, read_formatter);
278    }
279
280    #[test]
281    fn test_read_file_header_failed() {
282        let dir = get_temporary_directory_path();
283        let storage_id = 1;
284        create_file(&dir, FileType::DataFile, Some(storage_id)).unwrap();
285
286        let mut file = open_file(&dir, FileType::DataFile, Some(storage_id))
287            .unwrap()
288            .file;
289
290        let read_formatter = get_formatter_from_file(&mut file).unwrap_err();
291        assert_matches!(read_formatter, FormatterError::ReadFileHeaderFailed(_, _));
292    }
293
294    #[test]
295    fn test_invalid_magic_word() {
296        let dir = get_temporary_directory_path();
297        let storage_id = 1;
298        let mut file = create_file(&dir, FileType::DataFile, Some(storage_id)).unwrap();
299        file.write_all(b"bad magic word").unwrap();
300
301        let mut file = open_file(&dir, FileType::DataFile, Some(storage_id))
302            .unwrap()
303            .file;
304
305        let read_formatter = get_formatter_from_file(&mut file).unwrap_err();
306        assert_matches!(read_formatter, FormatterError::MagicNotMatch());
307    }
308
309    #[test]
310    fn test_unknown_formatter_version() {
311        let dir = get_temporary_directory_path();
312        let storage_id = 1;
313        let mut file = create_file(&dir, FileType::DataFile, Some(storage_id)).unwrap();
314        file.write_all(MAGIC).unwrap();
315        file.write_all(b"invalid data").unwrap();
316
317        let mut file = open_file(&dir, FileType::DataFile, Some(storage_id))
318            .unwrap()
319            .file;
320
321        let read_formatter = get_formatter_from_file(&mut file).unwrap_err();
322        assert_matches!(read_formatter, FormatterError::UnknownFormatterVersion(_));
323    }
324}