Skip to main content

hematite/catalog/
serialization.rs

1//! Relational row and index-key encoding.
2
3use crate::catalog::{
4    DateTimeValue, DateValue, DecimalValue, TimeValue, TimeWithTimeZoneValue, Value,
5};
6use crate::error::{HematiteError, Result};
7
8use super::record::StoredRow;
9
10pub struct RowCodec;
11
12impl RowCodec {
13    pub fn encode_values(values: &[Value]) -> Result<Vec<u8>> {
14        Self::encode_stored_row(&StoredRow {
15            row_id: 0,
16            values: values.to_vec(),
17        })
18    }
19
20    pub fn encode_stored_row(row: &StoredRow) -> Result<Vec<u8>> {
21        let mut buffer = Vec::new();
22        buffer.extend_from_slice(&(0u32).to_le_bytes());
23        buffer.extend_from_slice(&row.row_id.to_le_bytes());
24        buffer.extend_from_slice(&(row.values.len() as u32).to_le_bytes());
25
26        for value in &row.values {
27            match value {
28                Value::Integer(i) => {
29                    buffer.push(1);
30                    buffer.extend_from_slice(&i.to_le_bytes());
31                }
32                Value::BigInt(i) => {
33                    buffer.push(6);
34                    buffer.extend_from_slice(&i.to_le_bytes());
35                }
36                Value::Int128(i) => {
37                    buffer.push(17);
38                    buffer.extend_from_slice(&i.to_le_bytes());
39                }
40                Value::UInteger(i) => {
41                    buffer.push(18);
42                    buffer.extend_from_slice(&i.to_le_bytes());
43                }
44                Value::UBigInt(i) => {
45                    buffer.push(19);
46                    buffer.extend_from_slice(&i.to_le_bytes());
47                }
48                Value::UInt128(i) => {
49                    buffer.push(20);
50                    buffer.extend_from_slice(&i.to_le_bytes());
51                }
52                Value::Text(s) => {
53                    buffer.push(2);
54                    write_bytes(&mut buffer, s.as_bytes());
55                }
56                Value::Enum(s) => {
57                    buffer.push(11);
58                    write_bytes(&mut buffer, s.as_bytes());
59                }
60                Value::Boolean(b) => {
61                    buffer.push(3);
62                    buffer.push(u8::from(*b));
63                }
64                Value::Float32(f) => {
65                    buffer.push(21);
66                    buffer.extend_from_slice(&f.to_le_bytes());
67                }
68                Value::Float(f) => {
69                    buffer.push(4);
70                    buffer.extend_from_slice(&f.to_le_bytes());
71                }
72                Value::Decimal(decimal) => {
73                    buffer.push(7);
74                    write_decimal(&mut buffer, decimal);
75                }
76                Value::Blob(bytes) => {
77                    buffer.push(8);
78                    write_bytes(&mut buffer, bytes);
79                }
80                Value::Date(date) => {
81                    buffer.push(9);
82                    buffer.extend_from_slice(&date.days_since_epoch().to_le_bytes());
83                }
84                Value::Time(time) => {
85                    buffer.push(12);
86                    buffer.extend_from_slice(&time.seconds_since_midnight().to_le_bytes());
87                }
88                Value::DateTime(datetime) => {
89                    buffer.push(10);
90                    buffer.extend_from_slice(&datetime.seconds_since_epoch().to_le_bytes());
91                }
92                Value::TimeWithTimeZone(value) => {
93                    buffer.push(14);
94                    buffer.extend_from_slice(&value.seconds_since_midnight().to_le_bytes());
95                    buffer.extend_from_slice(&value.offset_minutes().to_le_bytes());
96                }
97                Value::IntervalYearMonth(value) => {
98                    buffer.push(15);
99                    buffer.extend_from_slice(&value.total_months().to_le_bytes());
100                }
101                Value::IntervalDaySecond(value) => {
102                    buffer.push(16);
103                    buffer.extend_from_slice(&value.total_seconds().to_le_bytes());
104                }
105                Value::Null => buffer.push(5),
106            }
107        }
108
109        let payload_len = buffer.len() - 4;
110        buffer[0..4].copy_from_slice(&(payload_len as u32).to_le_bytes());
111        Ok(buffer)
112    }
113
114    pub fn decode_values(data: &[u8]) -> Result<Vec<Value>> {
115        let encoded = if data.len() >= 4 {
116            let payload_len = Self::read_payload_length(&data[0..4])?;
117            if payload_len + 4 == data.len() {
118                data.to_vec()
119            } else {
120                let mut encoded = Vec::with_capacity(data.len() + 4);
121                encoded.extend_from_slice(&(data.len() as u32).to_le_bytes());
122                encoded.extend_from_slice(data);
123                encoded
124            }
125        } else {
126            let mut encoded = Vec::with_capacity(data.len() + 4);
127            encoded.extend_from_slice(&(data.len() as u32).to_le_bytes());
128            encoded.extend_from_slice(data);
129            encoded
130        };
131
132        Ok(Self::decode_stored_row(&encoded)?.values)
133    }
134
135    pub fn decode_stored_row(data: &[u8]) -> Result<StoredRow> {
136        if data.len() < 12 {
137            return Err(HematiteError::CorruptedData(
138                "Stored row header is truncated".to_string(),
139            ));
140        }
141
142        let mut offset = 0usize;
143        let payload_len = Self::read_payload_length(&data[0..4])?;
144        offset += 4;
145
146        if payload_len + 4 > data.len() {
147            return Err(HematiteError::CorruptedData(
148                "Stored row length exceeds available bytes".to_string(),
149            ));
150        }
151
152        let row_id = u64::from_le_bytes(data[offset..offset + 8].try_into().map_err(|_| {
153            HematiteError::CorruptedData("Stored row rowid is truncated".to_string())
154        })?);
155        offset += 8;
156
157        let value_count = u32::from_le_bytes(data[offset..offset + 4].try_into().map_err(|_| {
158            HematiteError::CorruptedData("Stored row value count is truncated".to_string())
159        })?) as usize;
160        offset += 4;
161
162        let payload_end = payload_len + 4;
163        let mut values = Vec::with_capacity(value_count);
164
165        for _ in 0..value_count {
166            if offset >= payload_end {
167                return Err(HematiteError::CorruptedData(
168                    "Stored row ended before all values were decoded".to_string(),
169                ));
170            }
171
172            let tag = data[offset];
173            offset += 1;
174            let value = match tag {
175                1 => {
176                    let bytes = read_exact(data, &mut offset, payload_end, 4, "Integer value")?;
177                    Value::Integer(i32::from_le_bytes(bytes.try_into().unwrap()))
178                }
179                2 => {
180                    let bytes = read_bytes(data, &mut offset, payload_end, "Text value")?;
181                    let text = String::from_utf8(bytes).map_err(|_| {
182                        HematiteError::CorruptedData("Invalid UTF-8 in text value".to_string())
183                    })?;
184                    Value::Text(text)
185                }
186                11 => {
187                    let bytes = read_bytes(data, &mut offset, payload_end, "Enum value")?;
188                    let text = String::from_utf8(bytes).map_err(|_| {
189                        HematiteError::CorruptedData("Invalid UTF-8 in enum value".to_string())
190                    })?;
191                    Value::Enum(text)
192                }
193                3 => {
194                    let bytes = read_exact(data, &mut offset, payload_end, 1, "Boolean value")?;
195                    Value::Boolean(bytes[0] != 0)
196                }
197                4 => {
198                    let bytes = read_exact(data, &mut offset, payload_end, 8, "Float value")?;
199                    Value::Float(f64::from_le_bytes(bytes.try_into().unwrap()))
200                }
201                21 => {
202                    let bytes = read_exact(data, &mut offset, payload_end, 4, "Float32 value")?;
203                    Value::Float32(f32::from_le_bytes(bytes.try_into().unwrap()))
204                }
205                5 => Value::Null,
206                6 => {
207                    let bytes = read_exact(data, &mut offset, payload_end, 8, "BigInt value")?;
208                    Value::BigInt(i64::from_le_bytes(bytes.try_into().unwrap()))
209                }
210                17 => {
211                    let bytes = read_exact(data, &mut offset, payload_end, 16, "Int128 value")?;
212                    Value::Int128(i128::from_le_bytes(bytes.try_into().unwrap()))
213                }
214                18 => {
215                    let bytes = read_exact(data, &mut offset, payload_end, 4, "UInt value")?;
216                    Value::UInteger(u32::from_le_bytes(bytes.try_into().unwrap()))
217                }
218                19 => {
219                    let bytes = read_exact(data, &mut offset, payload_end, 8, "UInt64 value")?;
220                    Value::UBigInt(u64::from_le_bytes(bytes.try_into().unwrap()))
221                }
222                20 => {
223                    let bytes = read_exact(data, &mut offset, payload_end, 16, "UInt128 value")?;
224                    Value::UInt128(u128::from_le_bytes(bytes.try_into().unwrap()))
225                }
226                7 => Value::Decimal(read_decimal(data, &mut offset, payload_end)?),
227                8 => Value::Blob(read_bytes(data, &mut offset, payload_end, "Blob value")?),
228                9 => {
229                    let bytes = read_exact(data, &mut offset, payload_end, 4, "Date value")?;
230                    Value::Date(DateValue::from_days_since_epoch(i32::from_le_bytes(
231                        bytes.try_into().unwrap(),
232                    )))
233                }
234                12 => {
235                    let bytes = read_exact(data, &mut offset, payload_end, 4, "Time value")?;
236                    Value::Time(TimeValue::from_seconds_since_midnight(u32::from_le_bytes(
237                        bytes.try_into().unwrap(),
238                    )))
239                }
240                10 => {
241                    let bytes = read_exact(data, &mut offset, payload_end, 8, "DateTime value")?;
242                    Value::DateTime(DateTimeValue::from_seconds_since_epoch(i64::from_le_bytes(
243                        bytes.try_into().unwrap(),
244                    )))
245                }
246                14 => {
247                    let seconds = u32::from_le_bytes(
248                        read_exact(
249                            data,
250                            &mut offset,
251                            payload_end,
252                            4,
253                            "Time with time zone seconds",
254                        )?
255                        .try_into()
256                        .unwrap(),
257                    );
258                    let offset_minutes = i16::from_le_bytes(
259                        read_exact(
260                            data,
261                            &mut offset,
262                            payload_end,
263                            2,
264                            "Time with time zone offset",
265                        )?
266                        .try_into()
267                        .unwrap(),
268                    );
269                    Value::TimeWithTimeZone(TimeWithTimeZoneValue::from_parts(
270                        seconds,
271                        offset_minutes,
272                    ))
273                }
274                15 => {
275                    let bytes =
276                        read_exact(data, &mut offset, payload_end, 4, "Interval year-month")?;
277                    Value::IntervalYearMonth(crate::catalog::IntervalYearMonthValue::new(
278                        i32::from_le_bytes(bytes.try_into().unwrap()),
279                    ))
280                }
281                16 => {
282                    let bytes =
283                        read_exact(data, &mut offset, payload_end, 8, "Interval day-second")?;
284                    Value::IntervalDaySecond(crate::catalog::IntervalDaySecondValue::new(
285                        i64::from_le_bytes(bytes.try_into().unwrap()),
286                    ))
287                }
288                _ => {
289                    return Err(HematiteError::CorruptedData(format!(
290                        "Unknown value tag {} in stored row",
291                        tag
292                    )))
293                }
294            };
295
296            values.push(value);
297        }
298
299        Ok(StoredRow { row_id, values })
300    }
301
302    pub fn read_payload_length(prefix: &[u8]) -> Result<usize> {
303        if prefix.len() != 4 {
304            return Err(HematiteError::CorruptedData(
305                "Row length prefix must be 4 bytes".to_string(),
306            ));
307        }
308
309        Ok(u32::from_le_bytes([prefix[0], prefix[1], prefix[2], prefix[3]]) as usize)
310    }
311}
312
313pub struct IndexKeyCodec;
314
315impl IndexKeyCodec {
316    pub fn encode_key(values: &[Value]) -> Result<Vec<u8>> {
317        let mut buffer = Vec::new();
318        for value in values {
319            encode_key_value(&mut buffer, value);
320        }
321        Ok(buffer)
322    }
323
324    pub fn encode_secondary_key(values: &[Value], row_id: u64) -> Result<Vec<u8>> {
325        let mut key = Self::encode_key(values)?;
326        key.extend_from_slice(&row_id.to_be_bytes());
327        Ok(key)
328    }
329
330    pub fn decode_row_id(value: &[u8]) -> Result<u64> {
331        if value.len() != 8 {
332            return Err(HematiteError::CorruptedData(
333                "Index rowid payload must be exactly 8 bytes".to_string(),
334            ));
335        }
336        let mut bytes = [0u8; 8];
337        bytes.copy_from_slice(value);
338        Ok(u64::from_be_bytes(bytes))
339    }
340
341    pub fn split_secondary_key(key: &[u8]) -> Result<(Vec<u8>, u64)> {
342        if key.len() < 8 {
343            return Err(HematiteError::CorruptedData(
344                "Index entry is missing rowid bytes".to_string(),
345            ));
346        }
347        let mut row_id_bytes = [0u8; 8];
348        row_id_bytes.copy_from_slice(&key[key.len() - 8..]);
349        let row_id = u64::from_be_bytes(row_id_bytes);
350        Ok((key[..key.len() - 8].to_vec(), row_id))
351    }
352}
353
354pub struct RowSerializer;
355
356impl RowSerializer {
357    pub fn serialize(values: &[Value]) -> Result<Vec<u8>> {
358        RowCodec::encode_values(values)
359    }
360
361    pub fn serialize_stored_row(row: &StoredRow) -> Result<Vec<u8>> {
362        RowCodec::encode_stored_row(row)
363    }
364
365    pub fn deserialize(data: &[u8]) -> Result<Vec<Value>> {
366        RowCodec::decode_values(data)
367    }
368
369    pub fn deserialize_stored_row(data: &[u8]) -> Result<StoredRow> {
370        RowCodec::decode_stored_row(data)
371    }
372
373    pub fn read_row_length(prefix: &[u8]) -> Result<usize> {
374        RowCodec::read_payload_length(prefix)
375    }
376}
377
378fn encode_key_value(buffer: &mut Vec<u8>, value: &Value) {
379    match value {
380        Value::Null => buffer.push(0),
381        Value::Boolean(false) => buffer.push(1),
382        Value::Boolean(true) => buffer.push(2),
383        Value::Integer(value) => {
384            buffer.push(3);
385            buffer.extend_from_slice(&(i32::to_be_bytes(*value ^ i32::MIN)));
386        }
387        Value::BigInt(value) => {
388            buffer.push(4);
389            buffer.extend_from_slice(&(i64::to_be_bytes(*value ^ i64::MIN)));
390        }
391        Value::Int128(value) => {
392            buffer.push(17);
393            buffer.extend_from_slice(&(i128::to_be_bytes(*value ^ i128::MIN)));
394        }
395        Value::UInteger(value) => {
396            buffer.push(18);
397            buffer.extend_from_slice(&value.to_be_bytes());
398        }
399        Value::UBigInt(value) => {
400            buffer.push(19);
401            buffer.extend_from_slice(&value.to_be_bytes());
402        }
403        Value::UInt128(value) => {
404            buffer.push(20);
405            buffer.extend_from_slice(&value.to_be_bytes());
406        }
407        Value::Float32(value) => {
408            buffer.push(21);
409            buffer.extend_from_slice(&ordered_f32_bytes(*value));
410        }
411        Value::Float(value) => {
412            buffer.push(5);
413            buffer.extend_from_slice(&ordered_f64_bytes(*value));
414        }
415        Value::Decimal(value) => {
416            buffer.push(6);
417            buffer.push(u8::from(value.negative()));
418            buffer.extend_from_slice(&value.scale().to_be_bytes());
419            buffer.extend_from_slice(&(value.digit_bytes().len() as u32).to_be_bytes());
420            write_packed_digits(buffer, value.digit_bytes());
421        }
422        Value::Text(value) => {
423            buffer.push(7);
424            write_bytes(buffer, value.as_bytes());
425        }
426        Value::Enum(value) => {
427            buffer.push(11);
428            write_bytes(buffer, value.as_bytes());
429        }
430        Value::Blob(value) => {
431            buffer.push(8);
432            write_bytes(buffer, value);
433        }
434        Value::Date(value) => {
435            buffer.push(9);
436            buffer.extend_from_slice(&(i32::to_be_bytes(value.days_since_epoch() ^ i32::MIN)));
437        }
438        Value::Time(value) => {
439            buffer.push(12);
440            buffer.extend_from_slice(&value.seconds_since_midnight().to_be_bytes());
441        }
442        Value::DateTime(value) => {
443            buffer.push(10);
444            buffer.extend_from_slice(&(i64::to_be_bytes(value.seconds_since_epoch() ^ i64::MIN)));
445        }
446        Value::TimeWithTimeZone(value) => {
447            buffer.push(14);
448            buffer.extend_from_slice(&value.seconds_since_midnight().to_be_bytes());
449            buffer.extend_from_slice(&(i16::to_be_bytes(value.offset_minutes() ^ i16::MIN)));
450        }
451        Value::IntervalYearMonth(value) => {
452            buffer.push(15);
453            buffer.extend_from_slice(&(i32::to_be_bytes(value.total_months() ^ i32::MIN)));
454        }
455        Value::IntervalDaySecond(value) => {
456            buffer.push(16);
457            buffer.extend_from_slice(&(i64::to_be_bytes(value.total_seconds() ^ i64::MIN)));
458        }
459    }
460}
461
462fn ordered_f64_bytes(value: f64) -> [u8; 8] {
463    let bits = value.to_bits();
464    let transformed = if (bits >> 63) == 0 {
465        bits ^ (1u64 << 63)
466    } else {
467        !bits
468    };
469    transformed.to_be_bytes()
470}
471
472fn ordered_f32_bytes(value: f32) -> [u8; 4] {
473    let bits = value.to_bits();
474    let transformed = if (bits >> 31) == 0 {
475        bits ^ (1u32 << 31)
476    } else {
477        !bits
478    };
479    transformed.to_be_bytes()
480}
481
482fn write_bytes(buffer: &mut Vec<u8>, bytes: &[u8]) {
483    buffer.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
484    buffer.extend_from_slice(bytes);
485}
486
487fn read_bytes(data: &[u8], offset: &mut usize, end: usize, label: &str) -> Result<Vec<u8>> {
488    let len_bytes = read_exact(data, offset, end, 4, &format!("{label} length"))?;
489    let len = u32::from_le_bytes(len_bytes.try_into().unwrap()) as usize;
490    Ok(read_exact(data, offset, end, len, label)?.to_vec())
491}
492
493fn read_exact<'a>(
494    data: &'a [u8],
495    offset: &mut usize,
496    end: usize,
497    len: usize,
498    label: &str,
499) -> Result<&'a [u8]> {
500    if *offset + len > end {
501        return Err(HematiteError::CorruptedData(format!(
502            "{} is truncated",
503            label
504        )));
505    }
506    let bytes = &data[*offset..*offset + len];
507    *offset += len;
508    Ok(bytes)
509}
510
511fn write_decimal(buffer: &mut Vec<u8>, value: &DecimalValue) {
512    buffer.push(u8::from(value.negative()));
513    buffer.extend_from_slice(&value.scale().to_le_bytes());
514    buffer.extend_from_slice(&(value.digit_bytes().len() as u32).to_le_bytes());
515    write_packed_digits(buffer, value.digit_bytes());
516}
517
518fn read_decimal(data: &[u8], offset: &mut usize, end: usize) -> Result<DecimalValue> {
519    let sign = read_exact(data, offset, end, 1, "Decimal sign")?[0] != 0;
520    let scale = u32::from_le_bytes(
521        read_exact(data, offset, end, 4, "Decimal scale")?
522            .try_into()
523            .unwrap(),
524    );
525    let digit_count = u32::from_le_bytes(
526        read_exact(data, offset, end, 4, "Decimal digit count")?
527            .try_into()
528            .unwrap(),
529    ) as usize;
530    let packed_len = digit_count.div_ceil(2);
531    let packed = read_exact(data, offset, end, packed_len, "Decimal digits")?;
532    let digits = read_packed_digits(packed, digit_count)?;
533    let mut decimal = DecimalValue::parse(&format_decimal_digits(sign, &digits, scale as usize))?;
534    if decimal.digit_bytes().len() == 1 && decimal.digit_bytes()[0] == 0 {
535        decimal = DecimalValue::zero();
536    }
537    Ok(decimal)
538}
539
540fn format_decimal_digits(negative: bool, digits: &[u8], scale: usize) -> String {
541    let mut out = String::new();
542    if negative && !(digits.len() == 1 && digits[0] == 0) {
543        out.push('-');
544    }
545    let digit_string = digits
546        .iter()
547        .map(|digit| char::from(b'0' + *digit))
548        .collect::<String>();
549    if scale == 0 {
550        out.push_str(&digit_string);
551        return out;
552    }
553    if digit_string.len() <= scale {
554        out.push_str("0.");
555        for _ in 0..scale - digit_string.len() {
556            out.push('0');
557        }
558        out.push_str(&digit_string);
559        return out;
560    }
561    let split = digit_string.len() - scale;
562    out.push_str(&digit_string[..split]);
563    out.push('.');
564    out.push_str(&digit_string[split..]);
565    out
566}
567
568fn write_packed_digits(buffer: &mut Vec<u8>, digits: &[u8]) {
569    for chunk in digits.chunks(2) {
570        let high = chunk[0] & 0x0F;
571        let low = if chunk.len() > 1 {
572            chunk[1] & 0x0F
573        } else {
574            0x0F
575        };
576        buffer.push((high << 4) | low);
577    }
578}
579
580fn read_packed_digits(bytes: &[u8], digit_count: usize) -> Result<Vec<u8>> {
581    let mut digits = Vec::with_capacity(digit_count);
582    for byte in bytes {
583        digits.push((byte >> 4) & 0x0F);
584        if digits.len() == digit_count {
585            break;
586        }
587        let low = byte & 0x0F;
588        if low <= 9 {
589            digits.push(low);
590        }
591        if digits.len() == digit_count {
592            break;
593        }
594    }
595    if digits.len() != digit_count || digits.iter().any(|digit| *digit > 9) {
596        return Err(HematiteError::CorruptedData(
597            "Packed decimal digits are invalid".to_string(),
598        ));
599    }
600    Ok(digits)
601}