bitcasky_common/formatter/
formatter_v1.rs

1use std::ops::Deref;
2
3use byteorder::{ByteOrder, LittleEndian};
4use bytes::{Buf, Bytes};
5use crc::{Crc, CRC_32_CKSUM};
6
7use crate::copy_memory;
8
9use super::{
10    Formatter, FormatterError, MergeMeta, Result, RowHeader, RowHintHeader, RowMeta, RowToWrite,
11};
12
13const CRC_SIZE: usize = 4;
14const TSTAMP_SIZE: usize = 8;
15const KEY_SIZE_SIZE: usize = 8;
16const VALUE_SIZE_SIZE: usize = 8;
17const DATA_FILE_TSTAMP_OFFSET: usize = CRC_SIZE;
18const DATA_FILE_KEY_SIZE_OFFSET: usize = CRC_SIZE + TSTAMP_SIZE;
19const DATA_FILE_VALUE_SIZE_OFFSET: usize = DATA_FILE_KEY_SIZE_OFFSET + KEY_SIZE_SIZE;
20const DATA_FILE_KEY_OFFSET: usize = CRC_SIZE + TSTAMP_SIZE + KEY_SIZE_SIZE + VALUE_SIZE_SIZE;
21
22const ROW_OFFSET_SIZE: usize = 8;
23const HINT_FILE_KEY_SIZE_OFFSET: usize = TSTAMP_SIZE;
24const HINT_FILE_ROW_OFFSET_OFFSET: usize = HINT_FILE_KEY_SIZE_OFFSET + KEY_SIZE_SIZE;
25const HINT_FILE_KEY_OFFSET: usize = HINT_FILE_ROW_OFFSET_OFFSET + ROW_OFFSET_SIZE;
26const HINT_FILE_HEADER_SIZE: usize = TSTAMP_SIZE + KEY_SIZE_SIZE + ROW_OFFSET_SIZE;
27
28const MERGE_META_FILE_SIZE: usize = 4;
29
30#[derive(Debug, Clone, Copy, PartialEq, Default)]
31pub struct FormatterV1 {}
32
33impl FormatterV1 {
34    fn gen_crc<V: Deref<Target = [u8]>>(&self, meta: &RowMeta, key: &[u8], value: &V) -> u32 {
35        let crc32 = Crc::<u32>::new(&CRC_32_CKSUM);
36        let mut ck = crc32.digest();
37        ck.update(&meta.expire_timestamp.to_be_bytes());
38        ck.update(&meta.key_size.to_be_bytes());
39        ck.update(&value.len().to_be_bytes());
40        ck.update(key.as_ref());
41        ck.update(value);
42        ck.finalize()
43    }
44
45    fn gen_crc_by_kv_bytes(&self, meta: &RowMeta, kv: &[u8]) -> u32 {
46        let crc32 = Crc::<u32>::new(&CRC_32_CKSUM);
47        let mut ck = crc32.digest();
48        ck.update(&meta.expire_timestamp.to_be_bytes());
49        ck.update(&meta.key_size.to_be_bytes());
50        ck.update(&meta.value_size.to_be_bytes());
51        ck.update(kv);
52        ck.finalize()
53    }
54}
55
56impl Formatter for FormatterV1 {
57    fn row_header_size(&self) -> usize {
58        DATA_FILE_KEY_OFFSET
59    }
60
61    fn net_row_size<K: AsRef<[u8]>, V: Deref<Target = [u8]>>(
62        &self,
63        row: &RowToWrite<K, V>,
64    ) -> usize {
65        self.row_header_size() + row.key.as_ref().len() + row.value.len()
66    }
67
68    fn encode_row<K: AsRef<[u8]>, V: Deref<Target = [u8]>>(
69        &self,
70        row: &RowToWrite<K, V>,
71        bs: &mut [u8],
72    ) -> usize {
73        let crc = self.gen_crc(&row.meta, row.key.as_ref(), &row.value);
74        LittleEndian::write_u32(bs, crc);
75        LittleEndian::write_u64(&mut bs[4..], row.meta.expire_timestamp);
76        LittleEndian::write_u64(&mut bs[12..], row.meta.key_size as u64);
77        LittleEndian::write_u64(&mut bs[20..], row.meta.value_size as u64);
78        copy_memory(row.key.as_ref(), &mut bs[28..]);
79        copy_memory(&row.value, &mut bs[28 + row.key.as_ref().len()..]);
80        self.net_row_size(row)
81    }
82
83    fn decode_row_header(&self, bs: &[u8]) -> RowHeader {
84        let expected_crc = LittleEndian::read_u32(&bs[0..DATA_FILE_TSTAMP_OFFSET]);
85        let timestamp =
86            LittleEndian::read_u64(&bs[DATA_FILE_TSTAMP_OFFSET..DATA_FILE_KEY_SIZE_OFFSET]);
87        let key_size = LittleEndian::read_u64(
88            &bs[DATA_FILE_KEY_SIZE_OFFSET..(DATA_FILE_KEY_SIZE_OFFSET + KEY_SIZE_SIZE)],
89        ) as usize;
90        let val_size = LittleEndian::read_u64(
91            &bs[DATA_FILE_VALUE_SIZE_OFFSET..(DATA_FILE_VALUE_SIZE_OFFSET + VALUE_SIZE_SIZE)],
92        ) as usize;
93        RowHeader {
94            crc: expected_crc,
95            meta: RowMeta {
96                expire_timestamp: timestamp,
97                key_size,
98                value_size: val_size,
99            },
100        }
101    }
102
103    fn validate_key_value(&self, header: &RowHeader, kv: &[u8]) -> Result<()> {
104        let actual_crc = self.gen_crc_by_kv_bytes(&header.meta, kv);
105        if header.crc != actual_crc {
106            return Err(FormatterError::CrcCheckFailed {
107                expected_crc: header.crc,
108                actual_crc,
109            });
110        }
111        Ok(())
112    }
113
114    fn encode_row_hint(&self, hint: &super::RowHint, output: &mut [u8]) -> usize {
115        let header = &hint.header;
116
117        LittleEndian::write_u64(output, header.expire_timestamp);
118        LittleEndian::write_u64(&mut output[8..], header.key_size as u64);
119        LittleEndian::write_u64(&mut output[16..], header.row_offset as u64);
120        copy_memory(&hint.key, &mut output[24..]);
121        HINT_FILE_HEADER_SIZE + hint.key.len()
122    }
123
124    fn row_hint_header_size(&self) -> usize {
125        HINT_FILE_HEADER_SIZE
126    }
127
128    fn decode_row_hint_header(&self, header_bs: &[u8]) -> RowHintHeader {
129        let timestamp = LittleEndian::read_u64(&header_bs[0..TSTAMP_SIZE]);
130        let key_size = LittleEndian::read_u64(
131            &header_bs[HINT_FILE_KEY_SIZE_OFFSET..HINT_FILE_ROW_OFFSET_OFFSET],
132        ) as usize;
133        let row_offset =
134            LittleEndian::read_u64(&header_bs[HINT_FILE_ROW_OFFSET_OFFSET..HINT_FILE_KEY_OFFSET])
135                as usize;
136        RowHintHeader {
137            expire_timestamp: timestamp,
138            key_size,
139            row_offset,
140        }
141    }
142
143    fn merge_meta_size(&self) -> usize {
144        MERGE_META_FILE_SIZE
145    }
146
147    fn encode_merge_meta(&self, meta: &super::MergeMeta) -> Bytes {
148        Bytes::copy_from_slice(&meta.known_max_storage_id.to_be_bytes())
149    }
150
151    fn decode_merge_meta(&self, mut meta: Bytes) -> MergeMeta {
152        let known_max_storage_id = meta.get_u32();
153        MergeMeta {
154            known_max_storage_id,
155        }
156    }
157}
158
159#[cfg(test)]
160mod tests {
161    use crate::formatter::RowHint;
162
163    use super::*;
164
165    use test_log::test;
166
167    #[test]
168    fn test_encode_decode_merge_meta() {
169        let merge_meta = MergeMeta {
170            known_max_storage_id: 123,
171        };
172
173        let formatter = FormatterV1 {};
174        let bytes = formatter.encode_merge_meta(&merge_meta);
175        assert_eq!(formatter.merge_meta_size(), bytes.len());
176        assert_eq!(merge_meta, formatter.decode_merge_meta(bytes));
177    }
178
179    #[test]
180    fn test_encode_decode_row_hint() {
181        let k = b"Hello".to_vec();
182        let hint = RowHint {
183            header: RowHintHeader {
184                expire_timestamp: 12345,
185                key_size: k.len(),
186                row_offset: 56789,
187            },
188            key: k,
189        };
190
191        let formatter = FormatterV1 {};
192        let mut bs: Vec<u8> = vec![0_u8; 2048];
193        formatter.encode_row_hint(&hint, bs.as_mut());
194        assert_eq!(hint.header, formatter.decode_row_hint_header(&bs));
195    }
196
197    #[test]
198    fn test_encode_decode_row() {
199        let k = b"Hello".to_vec();
200        let v = b"World".to_vec();
201        let row = RowToWrite {
202            meta: RowMeta {
203                expire_timestamp: 12345,
204                key_size: k.len(),
205                value_size: v.len(),
206            },
207            key: k,
208            value: v,
209        };
210
211        let formatter = FormatterV1 {};
212        let mut bs: Vec<u8> = vec![0_u8; 2048];
213
214        formatter.encode_row(&row, bs.as_mut());
215
216        assert_eq!(row.meta, formatter.decode_row_header(bs.as_ref()).meta);
217    }
218}