Skip to main content

yykv_layout/
lib.rs

1#![warn(missing_docs)]
2
3use bytes::{BufMut, Bytes, BytesMut};
4use crc32fast::Hasher;
5use yyds_types::DsValue;
6
7pub const HEADER_SIZE: usize = 32;
8pub const MAGIC: [u8; 2] = *b"YY";
9
10#[repr(u8)]
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12pub enum ValueType {
13    Raw = 0,
14    List = 1,
15    Dict = 2,
16    Object = 3,
17}
18
19#[derive(Debug, Clone, Copy)]
20pub struct Header {
21    pub magic: [u8; 2],
22    pub value_type: ValueType,
23    pub flags: u8,
24    pub length: u32,
25    pub checksum: u32,
26    pub payload_offset: u32,
27    pub reserved: [u8; 16],
28}
29
30impl Header {
31    pub fn new(value_type: ValueType, length: u32, payload_offset: u32) -> Self {
32        Self {
33            magic: MAGIC,
34            value_type,
35            flags: 0,
36            length,
37            checksum: 0,
38            payload_offset,
39            reserved: [0; 16],
40        }
41    }
42
43    pub fn to_bytes(&self) -> [u8; HEADER_SIZE] {
44        let mut buf = [0u8; HEADER_SIZE];
45        buf[0..2].copy_from_slice(&self.magic);
46        buf[2] = self.value_type as u8;
47        buf[3] = self.flags;
48        buf[4..8].copy_from_slice(&self.length.to_le_bytes());
49        buf[8..12].copy_from_slice(&self.checksum.to_le_bytes());
50        buf[12..16].copy_from_slice(&self.payload_offset.to_le_bytes());
51        buf
52    }
53
54    pub fn from_bytes(data: &[u8]) -> Option<Header> {
55        if data.len() < HEADER_SIZE || data[0..2] != MAGIC {
56            return None;
57        }
58        let vt = match data[2] {
59            1 => ValueType::List,
60            2 => ValueType::Dict,
61            3 => ValueType::Object,
62            _ => ValueType::Raw,
63        };
64        Some(Self {
65            magic: MAGIC,
66            value_type: vt,
67            flags: data[3],
68            length: u32::from_le_bytes(data[4..8].try_into().unwrap()),
69            checksum: u32::from_le_bytes(data[8..12].try_into().unwrap()),
70            payload_offset: u32::from_le_bytes(data[12..16].try_into().unwrap()),
71            reserved: [0; 16],
72        })
73    }
74}
75
76pub struct ListLayout;
77impl ListLayout {
78    /// Layout: [Header] | [ItemCount(u32)] | [OffsetTable(u32 * ItemCount)] | [Item1][Item2]...
79    pub fn encode(items: Vec<Bytes>) -> Bytes {
80        let count = items.len() as u32;
81        let mut offset_table = Vec::with_capacity(items.len());
82        let mut current_offset = 4 + (4 * count); // Skip count and table
83
84        let mut payload = BytesMut::new();
85        payload.put_u32_le(count);
86
87        for item in &items {
88            offset_table.push(current_offset);
89            current_offset += item.len() as u32;
90        }
91
92        for offset in offset_table {
93            payload.put_u32_le(offset);
94        }
95
96        for item in items {
97            payload.put(item);
98        }
99
100        let mut header = Header::new(
101            ValueType::List,
102            payload.len() as u32 + HEADER_SIZE as u32,
103            HEADER_SIZE as u32,
104        );
105        let mut hasher = Hasher::new();
106        hasher.update(&payload);
107        header.checksum = hasher.finalize();
108
109        let mut full = BytesMut::with_capacity(HEADER_SIZE + payload.len());
110        full.put_slice(&header.to_bytes());
111        full.put(payload);
112        full.freeze()
113    }
114}
115
116pub struct DictLayout;
117impl DictLayout {
118    /// Layout: [Header] | [KeyCount(u32)] | [SortedHashIndex(Hash:u32, Offset:u32) * KeyCount] | [KeyData] | [ValueData]
119    pub fn encode(mut entries: Vec<(String, Bytes)>) -> Bytes {
120        // Sort by key hash for O(log N) lookup
121        entries.sort_by_key(|(k, _)| {
122            let mut h = Hasher::new();
123            h.update(k.as_bytes());
124            h.finalize()
125        });
126
127        let count = entries.len() as u32;
128        let mut index_data = BytesMut::new();
129        let mut kv_data = BytesMut::new();
130
131        let mut current_offset = 4 + (8 * count); // Skip count and index table
132
133        for (key, val) in entries {
134            let mut h = Hasher::new();
135            h.update(key.as_bytes());
136            let hash = h.finalize();
137
138            index_data.put_u32_le(hash);
139            index_data.put_u32_le(current_offset);
140
141            // Write Key (len + data) and Value (len + data)
142            let val_len = val.len() as u32;
143            kv_data.put_u32_le(key.len() as u32);
144            kv_data.put_slice(key.as_bytes());
145            kv_data.put_u32_le(val_len);
146            kv_data.put(val);
147
148            current_offset += 4 + key.len() as u32 + 4 + val_len;
149        }
150
151        let mut payload = BytesMut::new();
152        payload.put_u32_le(count);
153        payload.put(index_data);
154        payload.put(kv_data);
155
156        let mut header = Header::new(
157            ValueType::Dict,
158            payload.len() as u32 + HEADER_SIZE as u32,
159            HEADER_SIZE as u32,
160        );
161        let mut hasher = Hasher::new();
162        hasher.update(&payload);
163        header.checksum = hasher.finalize();
164
165        let mut full = BytesMut::with_capacity(HEADER_SIZE + payload.len());
166        full.put_slice(&header.to_bytes());
167        full.put(payload);
168        full.freeze()
169    }
170}
171
172pub struct ObjectLayout;
173impl ObjectLayout {
174    /// Layout: [Header] | [SchemaID(u32)] | [TagCount(u32)] | [TagMap(Tag:u32, Offset:u32) * TagCount] | [FieldData]
175    pub fn encode(schema_id: u32, mut fields: Vec<(u32, Bytes)>) -> Bytes {
176        fields.sort_by_key(|(tag, _)| *tag);
177
178        let count = fields.len() as u32;
179        let mut tag_map = BytesMut::new();
180        let mut field_data = BytesMut::new();
181
182        let mut current_offset = 8 + (8 * count); // Skip schema_id, count and tag map
183
184        for (tag, data) in fields {
185            tag_map.put_u32_le(tag);
186            tag_map.put_u32_le(current_offset);
187
188            let data_len = data.len() as u32;
189            field_data.put_u32_le(data_len);
190            field_data.put(data);
191
192            current_offset += 4 + data_len;
193        }
194
195        let mut payload = BytesMut::new();
196        payload.put_u32_le(schema_id);
197        payload.put_u32_le(count);
198        payload.put(tag_map);
199        payload.put(field_data);
200
201        let mut header = Header::new(
202            ValueType::Object,
203            payload.len() as u32 + HEADER_SIZE as u32,
204            HEADER_SIZE as u32,
205        );
206        let mut hasher = Hasher::new();
207        hasher.update(&payload);
208        header.checksum = hasher.finalize();
209
210        let mut full = BytesMut::with_capacity(HEADER_SIZE + payload.len());
211        full.put_slice(&header.to_bytes());
212        full.put(payload);
213        full.freeze()
214    }
215}
216
217pub struct LayoutManager;
218
219impl LayoutManager {
220    pub fn new() -> Self {
221        Self
222    }
223}
224
225impl Default for LayoutManager {
226    fn default() -> Self {
227        Self::new()
228    }
229}
230
231impl LayoutManager {
232    #[allow(clippy::only_used_in_recursion)]
233    pub fn serialize(&self, value: &DsValue) -> Bytes {
234        match value {
235            DsValue::List(items) => {
236                let serialized_items: Vec<Bytes> =
237                    items.iter().map(|v| self.serialize(v)).collect();
238                ListLayout::encode(serialized_items)
239            }
240            DsValue::Dict(entries) => {
241                let serialized_entries: Vec<(String, Bytes)> = entries
242                    .iter()
243                    .map(|(k, v)| (k.clone(), self.serialize(v)))
244                    .collect();
245                DictLayout::encode(serialized_entries)
246            }
247            DsValue::Object { schema_id, fields } => {
248                let serialized_fields: Vec<(u32, Bytes)> = fields
249                    .iter()
250                    .map(|(tag, v)| (*tag, self.serialize(v)))
251                    .collect();
252                ObjectLayout::encode(*schema_id, serialized_fields)
253            }
254            _ => {
255                // Fallback to simple binary encoding for primitive types
256                let payload = yykv_types::layout::DsValueEncoder::encode(value)
257                    .unwrap_or_else(|_| Bytes::new());
258                let mut header = Header::new(
259                    ValueType::Raw,
260                    payload.len() as u32 + HEADER_SIZE as u32,
261                    HEADER_SIZE as u32,
262                );
263                let mut hasher = Hasher::new();
264                hasher.update(&payload);
265                header.checksum = hasher.finalize();
266
267                let mut full = BytesMut::with_capacity(HEADER_SIZE + payload.len());
268                full.put_slice(&header.to_bytes());
269                full.put(payload);
270                full.freeze()
271            }
272        }
273    }
274
275    #[allow(clippy::only_used_in_recursion)]
276    pub fn deserialize(&self, data: &[u8]) -> Option<DsValue> {
277        let header = Header::from_bytes(data)?;
278        let payload = &data[HEADER_SIZE..];
279
280        // Verify checksum
281        let mut hasher = Hasher::new();
282        hasher.update(payload);
283        if hasher.finalize() != header.checksum {
284            return None;
285        }
286
287        match header.value_type {
288            ValueType::List => {
289                let count = u32::from_le_bytes(payload[0..4].try_into().ok()?) as usize;
290                let offset_table_start = 4;
291                let mut items = Vec::with_capacity(count);
292
293                for i in 0..count {
294                    let start = offset_table_start + (i * 4);
295                    let offset =
296                        u32::from_le_bytes(payload[start..start + 4].try_into().ok()?) as usize;
297
298                    // The item ends at the next offset or end of payload
299                    let end = if i + 1 < count {
300                        u32::from_le_bytes(payload[start + 4..start + 8].try_into().ok()?) as usize
301                    } else {
302                        payload.len()
303                    };
304
305                    let item_data = &payload[offset..end];
306                    items.push(self.deserialize(item_data)?);
307                }
308                Some(DsValue::List(items))
309            }
310            ValueType::Dict => {
311                let count = u32::from_le_bytes(payload[0..4].try_into().ok()?) as usize;
312                let mut entries = std::collections::BTreeMap::new();
313                let index_start = 4;
314
315                for i in 0..count {
316                    let entry_start = index_start + (i * 8);
317                    // Skip hash (first 4 bytes of entry)
318                    let offset = u32::from_le_bytes(
319                        payload[entry_start + 4..entry_start + 8].try_into().ok()?,
320                    ) as usize;
321
322                    let mut curr = offset;
323                    let key_len =
324                        u32::from_le_bytes(payload[curr..curr + 4].try_into().ok()?) as usize;
325                    curr += 4;
326                    let key = String::from_utf8(payload[curr..curr + key_len].to_vec()).ok()?;
327                    curr += key_len;
328
329                    let val_len =
330                        u32::from_le_bytes(payload[curr..curr + 4].try_into().ok()?) as usize;
331                    curr += 4;
332                    let val_data = &payload[curr..curr + val_len];
333                    entries.insert(key, self.deserialize(val_data)?);
334                }
335                Some(DsValue::Dict(entries))
336            }
337            ValueType::Object => {
338                let schema_id = u32::from_le_bytes(payload[0..4].try_into().ok()?);
339                let count = u32::from_le_bytes(payload[4..8].try_into().ok()?) as usize;
340                let mut fields = std::collections::BTreeMap::new();
341                let map_start = 8;
342
343                for i in 0..count {
344                    let entry_start = map_start + (i * 8);
345                    let tag =
346                        u32::from_le_bytes(payload[entry_start..entry_start + 4].try_into().ok()?);
347                    let offset = u32::from_le_bytes(
348                        payload[entry_start + 4..entry_start + 8].try_into().ok()?,
349                    ) as usize;
350
351                    let val_len =
352                        u32::from_le_bytes(payload[offset..offset + 4].try_into().ok()?) as usize;
353                    let val_data = &payload[offset + 4..offset + 4 + val_len];
354                    fields.insert(tag, self.deserialize(val_data)?);
355                }
356                Some(DsValue::Object { schema_id, fields })
357            }
358            ValueType::Raw => {
359                let mut data = Bytes::copy_from_slice(payload);
360                yyds_types::layout::DsValueDecoder::decode(&mut data).ok()
361            }
362        }
363    }
364}