bitcasky_common/formatter/
formatter_v1.rs1use std::ops::Deref;
2
3use byteorder::{ByteOrder, LittleEndian};
4use bytes::{Buf, Bytes};
5use crc::{Crc, CRC_32_CKSUM};
6
7use crate::copy_memory;
8
9use super::{
10 Formatter, FormatterError, MergeMeta, Result, RowHeader, RowHintHeader, RowMeta, RowToWrite,
11};
12
13const CRC_SIZE: usize = 4;
14const TSTAMP_SIZE: usize = 8;
15const KEY_SIZE_SIZE: usize = 8;
16const VALUE_SIZE_SIZE: usize = 8;
17const DATA_FILE_TSTAMP_OFFSET: usize = CRC_SIZE;
18const DATA_FILE_KEY_SIZE_OFFSET: usize = CRC_SIZE + TSTAMP_SIZE;
19const DATA_FILE_VALUE_SIZE_OFFSET: usize = DATA_FILE_KEY_SIZE_OFFSET + KEY_SIZE_SIZE;
20const DATA_FILE_KEY_OFFSET: usize = CRC_SIZE + TSTAMP_SIZE + KEY_SIZE_SIZE + VALUE_SIZE_SIZE;
21
22const ROW_OFFSET_SIZE: usize = 8;
23const HINT_FILE_KEY_SIZE_OFFSET: usize = TSTAMP_SIZE;
24const HINT_FILE_ROW_OFFSET_OFFSET: usize = HINT_FILE_KEY_SIZE_OFFSET + KEY_SIZE_SIZE;
25const HINT_FILE_KEY_OFFSET: usize = HINT_FILE_ROW_OFFSET_OFFSET + ROW_OFFSET_SIZE;
26const HINT_FILE_HEADER_SIZE: usize = TSTAMP_SIZE + KEY_SIZE_SIZE + ROW_OFFSET_SIZE;
27
28const MERGE_META_FILE_SIZE: usize = 4;
29
30#[derive(Debug, Clone, Copy, PartialEq, Default)]
31pub struct FormatterV1 {}
32
33impl FormatterV1 {
34 fn gen_crc<V: Deref<Target = [u8]>>(&self, meta: &RowMeta, key: &[u8], value: &V) -> u32 {
35 let crc32 = Crc::<u32>::new(&CRC_32_CKSUM);
36 let mut ck = crc32.digest();
37 ck.update(&meta.expire_timestamp.to_be_bytes());
38 ck.update(&meta.key_size.to_be_bytes());
39 ck.update(&value.len().to_be_bytes());
40 ck.update(key.as_ref());
41 ck.update(value);
42 ck.finalize()
43 }
44
45 fn gen_crc_by_kv_bytes(&self, meta: &RowMeta, kv: &[u8]) -> u32 {
46 let crc32 = Crc::<u32>::new(&CRC_32_CKSUM);
47 let mut ck = crc32.digest();
48 ck.update(&meta.expire_timestamp.to_be_bytes());
49 ck.update(&meta.key_size.to_be_bytes());
50 ck.update(&meta.value_size.to_be_bytes());
51 ck.update(kv);
52 ck.finalize()
53 }
54}
55
56impl Formatter for FormatterV1 {
57 fn row_header_size(&self) -> usize {
58 DATA_FILE_KEY_OFFSET
59 }
60
61 fn net_row_size<K: AsRef<[u8]>, V: Deref<Target = [u8]>>(
62 &self,
63 row: &RowToWrite<K, V>,
64 ) -> usize {
65 self.row_header_size() + row.key.as_ref().len() + row.value.len()
66 }
67
68 fn encode_row<K: AsRef<[u8]>, V: Deref<Target = [u8]>>(
69 &self,
70 row: &RowToWrite<K, V>,
71 bs: &mut [u8],
72 ) -> usize {
73 let crc = self.gen_crc(&row.meta, row.key.as_ref(), &row.value);
74 LittleEndian::write_u32(bs, crc);
75 LittleEndian::write_u64(&mut bs[4..], row.meta.expire_timestamp);
76 LittleEndian::write_u64(&mut bs[12..], row.meta.key_size as u64);
77 LittleEndian::write_u64(&mut bs[20..], row.meta.value_size as u64);
78 copy_memory(row.key.as_ref(), &mut bs[28..]);
79 copy_memory(&row.value, &mut bs[28 + row.key.as_ref().len()..]);
80 self.net_row_size(row)
81 }
82
83 fn decode_row_header(&self, bs: &[u8]) -> RowHeader {
84 let expected_crc = LittleEndian::read_u32(&bs[0..DATA_FILE_TSTAMP_OFFSET]);
85 let timestamp =
86 LittleEndian::read_u64(&bs[DATA_FILE_TSTAMP_OFFSET..DATA_FILE_KEY_SIZE_OFFSET]);
87 let key_size = LittleEndian::read_u64(
88 &bs[DATA_FILE_KEY_SIZE_OFFSET..(DATA_FILE_KEY_SIZE_OFFSET + KEY_SIZE_SIZE)],
89 ) as usize;
90 let val_size = LittleEndian::read_u64(
91 &bs[DATA_FILE_VALUE_SIZE_OFFSET..(DATA_FILE_VALUE_SIZE_OFFSET + VALUE_SIZE_SIZE)],
92 ) as usize;
93 RowHeader {
94 crc: expected_crc,
95 meta: RowMeta {
96 expire_timestamp: timestamp,
97 key_size,
98 value_size: val_size,
99 },
100 }
101 }
102
103 fn validate_key_value(&self, header: &RowHeader, kv: &[u8]) -> Result<()> {
104 let actual_crc = self.gen_crc_by_kv_bytes(&header.meta, kv);
105 if header.crc != actual_crc {
106 return Err(FormatterError::CrcCheckFailed {
107 expected_crc: header.crc,
108 actual_crc,
109 });
110 }
111 Ok(())
112 }
113
114 fn encode_row_hint(&self, hint: &super::RowHint, output: &mut [u8]) -> usize {
115 let header = &hint.header;
116
117 LittleEndian::write_u64(output, header.expire_timestamp);
118 LittleEndian::write_u64(&mut output[8..], header.key_size as u64);
119 LittleEndian::write_u64(&mut output[16..], header.row_offset as u64);
120 copy_memory(&hint.key, &mut output[24..]);
121 HINT_FILE_HEADER_SIZE + hint.key.len()
122 }
123
124 fn row_hint_header_size(&self) -> usize {
125 HINT_FILE_HEADER_SIZE
126 }
127
128 fn decode_row_hint_header(&self, header_bs: &[u8]) -> RowHintHeader {
129 let timestamp = LittleEndian::read_u64(&header_bs[0..TSTAMP_SIZE]);
130 let key_size = LittleEndian::read_u64(
131 &header_bs[HINT_FILE_KEY_SIZE_OFFSET..HINT_FILE_ROW_OFFSET_OFFSET],
132 ) as usize;
133 let row_offset =
134 LittleEndian::read_u64(&header_bs[HINT_FILE_ROW_OFFSET_OFFSET..HINT_FILE_KEY_OFFSET])
135 as usize;
136 RowHintHeader {
137 expire_timestamp: timestamp,
138 key_size,
139 row_offset,
140 }
141 }
142
143 fn merge_meta_size(&self) -> usize {
144 MERGE_META_FILE_SIZE
145 }
146
147 fn encode_merge_meta(&self, meta: &super::MergeMeta) -> Bytes {
148 Bytes::copy_from_slice(&meta.known_max_storage_id.to_be_bytes())
149 }
150
151 fn decode_merge_meta(&self, mut meta: Bytes) -> MergeMeta {
152 let known_max_storage_id = meta.get_u32();
153 MergeMeta {
154 known_max_storage_id,
155 }
156 }
157}
158
159#[cfg(test)]
160mod tests {
161 use crate::formatter::RowHint;
162
163 use super::*;
164
165 use test_log::test;
166
167 #[test]
168 fn test_encode_decode_merge_meta() {
169 let merge_meta = MergeMeta {
170 known_max_storage_id: 123,
171 };
172
173 let formatter = FormatterV1 {};
174 let bytes = formatter.encode_merge_meta(&merge_meta);
175 assert_eq!(formatter.merge_meta_size(), bytes.len());
176 assert_eq!(merge_meta, formatter.decode_merge_meta(bytes));
177 }
178
179 #[test]
180 fn test_encode_decode_row_hint() {
181 let k = b"Hello".to_vec();
182 let hint = RowHint {
183 header: RowHintHeader {
184 expire_timestamp: 12345,
185 key_size: k.len(),
186 row_offset: 56789,
187 },
188 key: k,
189 };
190
191 let formatter = FormatterV1 {};
192 let mut bs: Vec<u8> = vec![0_u8; 2048];
193 formatter.encode_row_hint(&hint, bs.as_mut());
194 assert_eq!(hint.header, formatter.decode_row_hint_header(&bs));
195 }
196
197 #[test]
198 fn test_encode_decode_row() {
199 let k = b"Hello".to_vec();
200 let v = b"World".to_vec();
201 let row = RowToWrite {
202 meta: RowMeta {
203 expire_timestamp: 12345,
204 key_size: k.len(),
205 value_size: v.len(),
206 },
207 key: k,
208 value: v,
209 };
210
211 let formatter = FormatterV1 {};
212 let mut bs: Vec<u8> = vec![0_u8; 2048];
213
214 formatter.encode_row(&row, bs.as_mut());
215
216 assert_eq!(row.meta, formatter.decode_row_header(bs.as_ref()).meta);
217 }
218}