1#![warn(missing_docs)]
2
3use bytes::{BufMut, Bytes, BytesMut};
4use crc32fast::Hasher;
5use yyds_types::DsValue;
6
7pub const HEADER_SIZE: usize = 32;
8pub const MAGIC: [u8; 2] = *b"YY";
9
10#[repr(u8)]
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12pub enum ValueType {
13 Raw = 0,
14 List = 1,
15 Dict = 2,
16 Object = 3,
17}
18
19#[derive(Debug, Clone, Copy)]
20pub struct Header {
21 pub magic: [u8; 2],
22 pub value_type: ValueType,
23 pub flags: u8,
24 pub length: u32,
25 pub checksum: u32,
26 pub payload_offset: u32,
27 pub reserved: [u8; 16],
28}
29
30impl Header {
31 pub fn new(value_type: ValueType, length: u32, payload_offset: u32) -> Self {
32 Self {
33 magic: MAGIC,
34 value_type,
35 flags: 0,
36 length,
37 checksum: 0,
38 payload_offset,
39 reserved: [0; 16],
40 }
41 }
42
43 pub fn to_bytes(&self) -> [u8; HEADER_SIZE] {
44 let mut buf = [0u8; HEADER_SIZE];
45 buf[0..2].copy_from_slice(&self.magic);
46 buf[2] = self.value_type as u8;
47 buf[3] = self.flags;
48 buf[4..8].copy_from_slice(&self.length.to_le_bytes());
49 buf[8..12].copy_from_slice(&self.checksum.to_le_bytes());
50 buf[12..16].copy_from_slice(&self.payload_offset.to_le_bytes());
51 buf
52 }
53
54 pub fn from_bytes(data: &[u8]) -> Option<Header> {
55 if data.len() < HEADER_SIZE || data[0..2] != MAGIC {
56 return None;
57 }
58 let vt = match data[2] {
59 1 => ValueType::List,
60 2 => ValueType::Dict,
61 3 => ValueType::Object,
62 _ => ValueType::Raw,
63 };
64 Some(Self {
65 magic: MAGIC,
66 value_type: vt,
67 flags: data[3],
68 length: u32::from_le_bytes(data[4..8].try_into().unwrap()),
69 checksum: u32::from_le_bytes(data[8..12].try_into().unwrap()),
70 payload_offset: u32::from_le_bytes(data[12..16].try_into().unwrap()),
71 reserved: [0; 16],
72 })
73 }
74}
75
76pub struct ListLayout;
77impl ListLayout {
78 pub fn encode(items: Vec<Bytes>) -> Bytes {
80 let count = items.len() as u32;
81 let mut offset_table = Vec::with_capacity(items.len());
82 let mut current_offset = 4 + (4 * count); let mut payload = BytesMut::new();
85 payload.put_u32_le(count);
86
87 for item in &items {
88 offset_table.push(current_offset);
89 current_offset += item.len() as u32;
90 }
91
92 for offset in offset_table {
93 payload.put_u32_le(offset);
94 }
95
96 for item in items {
97 payload.put(item);
98 }
99
100 let mut header = Header::new(
101 ValueType::List,
102 payload.len() as u32 + HEADER_SIZE as u32,
103 HEADER_SIZE as u32,
104 );
105 let mut hasher = Hasher::new();
106 hasher.update(&payload);
107 header.checksum = hasher.finalize();
108
109 let mut full = BytesMut::with_capacity(HEADER_SIZE + payload.len());
110 full.put_slice(&header.to_bytes());
111 full.put(payload);
112 full.freeze()
113 }
114}
115
116pub struct DictLayout;
117impl DictLayout {
118 pub fn encode(mut entries: Vec<(String, Bytes)>) -> Bytes {
120 entries.sort_by_key(|(k, _)| {
122 let mut h = Hasher::new();
123 h.update(k.as_bytes());
124 h.finalize()
125 });
126
127 let count = entries.len() as u32;
128 let mut index_data = BytesMut::new();
129 let mut kv_data = BytesMut::new();
130
131 let mut current_offset = 4 + (8 * count); for (key, val) in entries {
134 let mut h = Hasher::new();
135 h.update(key.as_bytes());
136 let hash = h.finalize();
137
138 index_data.put_u32_le(hash);
139 index_data.put_u32_le(current_offset);
140
141 let val_len = val.len() as u32;
143 kv_data.put_u32_le(key.len() as u32);
144 kv_data.put_slice(key.as_bytes());
145 kv_data.put_u32_le(val_len);
146 kv_data.put(val);
147
148 current_offset += 4 + key.len() as u32 + 4 + val_len;
149 }
150
151 let mut payload = BytesMut::new();
152 payload.put_u32_le(count);
153 payload.put(index_data);
154 payload.put(kv_data);
155
156 let mut header = Header::new(
157 ValueType::Dict,
158 payload.len() as u32 + HEADER_SIZE as u32,
159 HEADER_SIZE as u32,
160 );
161 let mut hasher = Hasher::new();
162 hasher.update(&payload);
163 header.checksum = hasher.finalize();
164
165 let mut full = BytesMut::with_capacity(HEADER_SIZE + payload.len());
166 full.put_slice(&header.to_bytes());
167 full.put(payload);
168 full.freeze()
169 }
170}
171
172pub struct ObjectLayout;
173impl ObjectLayout {
174 pub fn encode(schema_id: u32, mut fields: Vec<(u32, Bytes)>) -> Bytes {
176 fields.sort_by_key(|(tag, _)| *tag);
177
178 let count = fields.len() as u32;
179 let mut tag_map = BytesMut::new();
180 let mut field_data = BytesMut::new();
181
182 let mut current_offset = 8 + (8 * count); for (tag, data) in fields {
185 tag_map.put_u32_le(tag);
186 tag_map.put_u32_le(current_offset);
187
188 let data_len = data.len() as u32;
189 field_data.put_u32_le(data_len);
190 field_data.put(data);
191
192 current_offset += 4 + data_len;
193 }
194
195 let mut payload = BytesMut::new();
196 payload.put_u32_le(schema_id);
197 payload.put_u32_le(count);
198 payload.put(tag_map);
199 payload.put(field_data);
200
201 let mut header = Header::new(
202 ValueType::Object,
203 payload.len() as u32 + HEADER_SIZE as u32,
204 HEADER_SIZE as u32,
205 );
206 let mut hasher = Hasher::new();
207 hasher.update(&payload);
208 header.checksum = hasher.finalize();
209
210 let mut full = BytesMut::with_capacity(HEADER_SIZE + payload.len());
211 full.put_slice(&header.to_bytes());
212 full.put(payload);
213 full.freeze()
214 }
215}
216
217pub struct LayoutManager;
218
219impl LayoutManager {
220 pub fn new() -> Self {
221 Self
222 }
223}
224
225impl Default for LayoutManager {
226 fn default() -> Self {
227 Self::new()
228 }
229}
230
231impl LayoutManager {
232 #[allow(clippy::only_used_in_recursion)]
233 pub fn serialize(&self, value: &DsValue) -> Bytes {
234 match value {
235 DsValue::List(items) => {
236 let serialized_items: Vec<Bytes> =
237 items.iter().map(|v| self.serialize(v)).collect();
238 ListLayout::encode(serialized_items)
239 }
240 DsValue::Dict(entries) => {
241 let serialized_entries: Vec<(String, Bytes)> = entries
242 .iter()
243 .map(|(k, v)| (k.clone(), self.serialize(v)))
244 .collect();
245 DictLayout::encode(serialized_entries)
246 }
247 DsValue::Object { schema_id, fields } => {
248 let serialized_fields: Vec<(u32, Bytes)> = fields
249 .iter()
250 .map(|(tag, v)| (*tag, self.serialize(v)))
251 .collect();
252 ObjectLayout::encode(*schema_id, serialized_fields)
253 }
254 _ => {
255 let payload = yykv_types::layout::DsValueEncoder::encode(value)
257 .unwrap_or_else(|_| Bytes::new());
258 let mut header = Header::new(
259 ValueType::Raw,
260 payload.len() as u32 + HEADER_SIZE as u32,
261 HEADER_SIZE as u32,
262 );
263 let mut hasher = Hasher::new();
264 hasher.update(&payload);
265 header.checksum = hasher.finalize();
266
267 let mut full = BytesMut::with_capacity(HEADER_SIZE + payload.len());
268 full.put_slice(&header.to_bytes());
269 full.put(payload);
270 full.freeze()
271 }
272 }
273 }
274
275 #[allow(clippy::only_used_in_recursion)]
276 pub fn deserialize(&self, data: &[u8]) -> Option<DsValue> {
277 let header = Header::from_bytes(data)?;
278 let payload = &data[HEADER_SIZE..];
279
280 let mut hasher = Hasher::new();
282 hasher.update(payload);
283 if hasher.finalize() != header.checksum {
284 return None;
285 }
286
287 match header.value_type {
288 ValueType::List => {
289 let count = u32::from_le_bytes(payload[0..4].try_into().ok()?) as usize;
290 let offset_table_start = 4;
291 let mut items = Vec::with_capacity(count);
292
293 for i in 0..count {
294 let start = offset_table_start + (i * 4);
295 let offset =
296 u32::from_le_bytes(payload[start..start + 4].try_into().ok()?) as usize;
297
298 let end = if i + 1 < count {
300 u32::from_le_bytes(payload[start + 4..start + 8].try_into().ok()?) as usize
301 } else {
302 payload.len()
303 };
304
305 let item_data = &payload[offset..end];
306 items.push(self.deserialize(item_data)?);
307 }
308 Some(DsValue::List(items))
309 }
310 ValueType::Dict => {
311 let count = u32::from_le_bytes(payload[0..4].try_into().ok()?) as usize;
312 let mut entries = std::collections::BTreeMap::new();
313 let index_start = 4;
314
315 for i in 0..count {
316 let entry_start = index_start + (i * 8);
317 let offset = u32::from_le_bytes(
319 payload[entry_start + 4..entry_start + 8].try_into().ok()?,
320 ) as usize;
321
322 let mut curr = offset;
323 let key_len =
324 u32::from_le_bytes(payload[curr..curr + 4].try_into().ok()?) as usize;
325 curr += 4;
326 let key = String::from_utf8(payload[curr..curr + key_len].to_vec()).ok()?;
327 curr += key_len;
328
329 let val_len =
330 u32::from_le_bytes(payload[curr..curr + 4].try_into().ok()?) as usize;
331 curr += 4;
332 let val_data = &payload[curr..curr + val_len];
333 entries.insert(key, self.deserialize(val_data)?);
334 }
335 Some(DsValue::Dict(entries))
336 }
337 ValueType::Object => {
338 let schema_id = u32::from_le_bytes(payload[0..4].try_into().ok()?);
339 let count = u32::from_le_bytes(payload[4..8].try_into().ok()?) as usize;
340 let mut fields = std::collections::BTreeMap::new();
341 let map_start = 8;
342
343 for i in 0..count {
344 let entry_start = map_start + (i * 8);
345 let tag =
346 u32::from_le_bytes(payload[entry_start..entry_start + 4].try_into().ok()?);
347 let offset = u32::from_le_bytes(
348 payload[entry_start + 4..entry_start + 8].try_into().ok()?,
349 ) as usize;
350
351 let val_len =
352 u32::from_le_bytes(payload[offset..offset + 4].try_into().ok()?) as usize;
353 let val_data = &payload[offset + 4..offset + 4 + val_len];
354 fields.insert(tag, self.deserialize(val_data)?);
355 }
356 Some(DsValue::Object { schema_id, fields })
357 }
358 ValueType::Raw => {
359 let mut data = Bytes::copy_from_slice(payload);
360 yyds_types::layout::DsValueDecoder::decode(&mut data).ok()
361 }
362 }
363 }
364}