bitcasky_common/formatter/
mod.rs1use std::{
2 fs::File,
3 io::{self, Read, Write},
4 ops::Deref,
5};
6
7use crate::storage_id::StorageId;
8
9use bytes::{BufMut, Bytes, BytesMut};
10use thiserror::Error;
11
12mod formatter_v1;
13pub use self::formatter_v1::FormatterV1;
14
15const MAGIC: &[u8; 3] = b"btk";
16const FORMATTER_V1_VERSION: u8 = 1;
17pub const FILE_HEADER_SIZE: usize = 8;
18
19#[derive(Debug, PartialEq, Eq)]
20pub struct RowMeta {
21 pub expire_timestamp: u64,
22 pub key_size: usize,
23 pub value_size: usize,
24}
25
26#[derive(Debug, PartialEq, Eq)]
27pub struct RowHeader {
28 pub crc: u32,
29 pub meta: RowMeta,
30}
31
32#[derive(Debug)]
33pub struct RowToWrite<K: AsRef<[u8]>, V: Deref<Target = [u8]>> {
34 pub meta: RowMeta,
35 pub key: K,
36 pub value: V,
37}
38
39#[derive(PartialEq, Eq, Debug, Clone, Copy)]
40pub struct MergeMeta {
41 pub known_max_storage_id: StorageId,
42}
43
44#[derive(Debug, PartialEq, Eq, Clone)]
45pub struct RowHintHeader {
46 pub expire_timestamp: u64,
47 pub key_size: usize,
48 pub row_offset: usize,
49}
50
51#[derive(Debug, PartialEq, Eq, Clone)]
52pub struct RowHint {
53 pub header: RowHintHeader,
54 pub key: Vec<u8>,
55}
56
57impl<K: AsRef<[u8]>, V: Deref<Target = [u8]>> RowToWrite<K, V> {
58 pub fn new(key: K, value: V) -> RowToWrite<K, V> {
59 RowToWrite::new_with_timestamp(key, value, 0)
60 }
61
62 pub fn new_with_timestamp(key: K, value: V, expire_timestamp: u64) -> RowToWrite<K, V> {
63 let key_size = key.as_ref().len();
64 let value_size = value.len();
65 RowToWrite {
66 meta: RowMeta {
67 expire_timestamp,
68 key_size,
69 value_size,
70 },
71 key,
72 value,
73 }
74 }
75}
76
77#[derive(Error, Debug)]
78#[error("{}")]
79pub enum FormatterError {
80 #[error("Crc check failed. expect crc is: {expected_crc}, actual crc is: {actual_crc}")]
81 CrcCheckFailed { expected_crc: u32, actual_crc: u32 },
82 #[error("Got IO Error: {0}")]
83 IoError(#[from] std::io::Error),
84 #[error("Read file header failed: {1}")]
85 ReadFileHeaderFailed(#[source] io::Error, String),
86 #[error("Magic string does not match")]
87 MagicNotMatch(),
88 #[error("Unknown formatter version: {0}")]
89 UnknownFormatterVersion(u8),
90}
91
92pub type Result<T> = std::result::Result<T, FormatterError>;
93
94pub trait Formatter: std::marker::Send + 'static + Copy {
95 fn row_header_size(&self) -> usize;
96
97 fn net_row_size<K: AsRef<[u8]>, V: Deref<Target = [u8]>>(
98 &self,
99 row: &RowToWrite<K, V>,
100 ) -> usize;
101
102 fn encode_row<K: AsRef<[u8]>, V: Deref<Target = [u8]>>(
103 &self,
104 row: &RowToWrite<K, V>,
105 output: &mut [u8],
106 ) -> usize;
107
108 fn decode_row_header(&self, bs: &[u8]) -> RowHeader;
109
110 fn validate_key_value(&self, header: &RowHeader, kv: &[u8]) -> Result<()>;
111
112 fn encode_row_hint(&self, hint: &RowHint, output: &mut [u8]) -> usize;
113
114 fn row_hint_header_size(&self) -> usize;
115
116 fn decode_row_hint_header(&self, header_bs: &[u8]) -> RowHintHeader;
117
118 fn merge_meta_size(&self) -> usize;
119
120 fn encode_merge_meta(&self, meta: &MergeMeta) -> Bytes;
121
122 fn decode_merge_meta(&self, meta: Bytes) -> MergeMeta;
123}
124
125#[derive(Clone, Copy, Debug, PartialEq)]
126pub enum BitcaskyFormatter {
127 V1(FormatterV1),
128}
129
130impl BitcaskyFormatter {
131 pub fn version(&self) -> u8 {
132 match self {
133 BitcaskyFormatter::V1(_) => FORMATTER_V1_VERSION,
134 }
135 }
136}
137
138impl Formatter for BitcaskyFormatter {
139 fn row_header_size(&self) -> usize {
140 match self {
141 BitcaskyFormatter::V1(f) => f.row_header_size(),
142 }
143 }
144
145 fn net_row_size<K: AsRef<[u8]>, V: Deref<Target = [u8]>>(
146 &self,
147 row: &RowToWrite<K, V>,
148 ) -> usize {
149 match self {
150 BitcaskyFormatter::V1(f) => f.net_row_size(row),
151 }
152 }
153
154 fn encode_row<K: AsRef<[u8]>, V: Deref<Target = [u8]>>(
155 &self,
156 row: &RowToWrite<K, V>,
157 output: &mut [u8],
158 ) -> usize {
159 match self {
160 BitcaskyFormatter::V1(f) => f.encode_row(row, output),
161 }
162 }
163
164 fn decode_row_header(&self, bs: &[u8]) -> RowHeader {
165 match self {
166 BitcaskyFormatter::V1(f) => f.decode_row_header(bs),
167 }
168 }
169
170 fn validate_key_value(&self, header: &RowHeader, kv: &[u8]) -> Result<()> {
171 match self {
172 BitcaskyFormatter::V1(f) => f.validate_key_value(header, kv),
173 }
174 }
175
176 fn row_hint_header_size(&self) -> usize {
177 match self {
178 BitcaskyFormatter::V1(f) => f.row_hint_header_size(),
179 }
180 }
181
182 fn encode_row_hint(&self, hint: &RowHint, output: &mut [u8]) -> usize {
183 match self {
184 BitcaskyFormatter::V1(f) => f.encode_row_hint(hint, output),
185 }
186 }
187
188 fn decode_row_hint_header(&self, header_bs: &[u8]) -> RowHintHeader {
189 match self {
190 BitcaskyFormatter::V1(f) => f.decode_row_hint_header(header_bs),
191 }
192 }
193
194 fn merge_meta_size(&self) -> usize {
195 match self {
196 BitcaskyFormatter::V1(f) => f.merge_meta_size(),
197 }
198 }
199
200 fn encode_merge_meta(&self, meta: &MergeMeta) -> Bytes {
201 match self {
202 BitcaskyFormatter::V1(f) => f.encode_merge_meta(meta),
203 }
204 }
205
206 fn decode_merge_meta(&self, meta: Bytes) -> MergeMeta {
207 match self {
208 BitcaskyFormatter::V1(f) => f.decode_merge_meta(meta),
209 }
210 }
211}
212
213impl Default for BitcaskyFormatter {
214 fn default() -> Self {
215 BitcaskyFormatter::V1(FormatterV1::default())
216 }
217}
218
219pub fn initialize_new_file(file: &mut File, version: u8) -> std::io::Result<()> {
220 let mut bs = BytesMut::with_capacity(FILE_HEADER_SIZE);
221
222 bs.extend_from_slice(MAGIC);
223 bs.put_u8(version);
224 bs.put_u32(0);
225
226 file.write_all(&bs.freeze())?;
227 file.flush()?;
228 Ok(())
229}
230
231pub fn get_formatter_from_file(file: &mut File) -> Result<BitcaskyFormatter> {
232 let mut file_header = vec![0; FILE_HEADER_SIZE];
233
234 file.read_exact(&mut file_header)
235 .map_err(|e| FormatterError::ReadFileHeaderFailed(e, "read file header failed".into()))?;
236
237 if MAGIC != &file_header[0..3] {
238 return Err(FormatterError::MagicNotMatch());
239 }
240
241 let formatter_version = file_header[3];
242 if formatter_version == FORMATTER_V1_VERSION {
243 return Ok(BitcaskyFormatter::V1(FormatterV1::default()));
244 }
245
246 Err(FormatterError::UnknownFormatterVersion(formatter_version))
247}
248
249pub fn padding(len: usize) -> usize {
251 4usize.wrapping_sub(len) & 7
252}
253
254#[cfg(test)]
255mod tests {
256 use crate::fs::{create_file, open_file, FileType};
257
258 use super::*;
259
260 use test_log::test;
261 use utilities::common::get_temporary_directory_path;
262
263 #[test]
264 fn test_formatter_v1_file() {
265 let dir = get_temporary_directory_path();
266 let storage_id = 1;
267 let mut file = create_file(&dir, FileType::DataFile, Some(storage_id)).unwrap();
268 let init_formatter = BitcaskyFormatter::V1(FormatterV1::default());
269 initialize_new_file(&mut file, init_formatter.version()).unwrap();
270
271 let mut file = open_file(&dir, FileType::DataFile, Some(storage_id))
272 .unwrap()
273 .file;
274
275 let read_formatter = get_formatter_from_file(&mut file).unwrap();
276 assert_matches!(read_formatter, BitcaskyFormatter::V1(_));
277 assert_eq!(init_formatter, read_formatter);
278 }
279
280 #[test]
281 fn test_read_file_header_failed() {
282 let dir = get_temporary_directory_path();
283 let storage_id = 1;
284 create_file(&dir, FileType::DataFile, Some(storage_id)).unwrap();
285
286 let mut file = open_file(&dir, FileType::DataFile, Some(storage_id))
287 .unwrap()
288 .file;
289
290 let read_formatter = get_formatter_from_file(&mut file).unwrap_err();
291 assert_matches!(read_formatter, FormatterError::ReadFileHeaderFailed(_, _));
292 }
293
294 #[test]
295 fn test_invalid_magic_word() {
296 let dir = get_temporary_directory_path();
297 let storage_id = 1;
298 let mut file = create_file(&dir, FileType::DataFile, Some(storage_id)).unwrap();
299 file.write_all(b"bad magic word").unwrap();
300
301 let mut file = open_file(&dir, FileType::DataFile, Some(storage_id))
302 .unwrap()
303 .file;
304
305 let read_formatter = get_formatter_from_file(&mut file).unwrap_err();
306 assert_matches!(read_formatter, FormatterError::MagicNotMatch());
307 }
308
309 #[test]
310 fn test_unknown_formatter_version() {
311 let dir = get_temporary_directory_path();
312 let storage_id = 1;
313 let mut file = create_file(&dir, FileType::DataFile, Some(storage_id)).unwrap();
314 file.write_all(MAGIC).unwrap();
315 file.write_all(b"invalid data").unwrap();
316
317 let mut file = open_file(&dir, FileType::DataFile, Some(storage_id))
318 .unwrap()
319 .file;
320
321 let read_formatter = get_formatter_from_file(&mut file).unwrap_err();
322 assert_matches!(read_formatter, FormatterError::UnknownFormatterVersion(_));
323 }
324}