1use crate::catalog::{
4 DateTimeValue, DateValue, DecimalValue, TimeValue, TimeWithTimeZoneValue, Value,
5};
6use crate::error::{HematiteError, Result};
7
8use super::record::StoredRow;
9
10pub struct RowCodec;
11
12impl RowCodec {
13 pub fn encode_values(values: &[Value]) -> Result<Vec<u8>> {
14 Self::encode_stored_row(&StoredRow {
15 row_id: 0,
16 values: values.to_vec(),
17 })
18 }
19
20 pub fn encode_stored_row(row: &StoredRow) -> Result<Vec<u8>> {
21 let mut buffer = Vec::new();
22 buffer.extend_from_slice(&(0u32).to_le_bytes());
23 buffer.extend_from_slice(&row.row_id.to_le_bytes());
24 buffer.extend_from_slice(&(row.values.len() as u32).to_le_bytes());
25
26 for value in &row.values {
27 match value {
28 Value::Integer(i) => {
29 buffer.push(1);
30 buffer.extend_from_slice(&i.to_le_bytes());
31 }
32 Value::BigInt(i) => {
33 buffer.push(6);
34 buffer.extend_from_slice(&i.to_le_bytes());
35 }
36 Value::Int128(i) => {
37 buffer.push(17);
38 buffer.extend_from_slice(&i.to_le_bytes());
39 }
40 Value::UInteger(i) => {
41 buffer.push(18);
42 buffer.extend_from_slice(&i.to_le_bytes());
43 }
44 Value::UBigInt(i) => {
45 buffer.push(19);
46 buffer.extend_from_slice(&i.to_le_bytes());
47 }
48 Value::UInt128(i) => {
49 buffer.push(20);
50 buffer.extend_from_slice(&i.to_le_bytes());
51 }
52 Value::Text(s) => {
53 buffer.push(2);
54 write_bytes(&mut buffer, s.as_bytes());
55 }
56 Value::Enum(s) => {
57 buffer.push(11);
58 write_bytes(&mut buffer, s.as_bytes());
59 }
60 Value::Boolean(b) => {
61 buffer.push(3);
62 buffer.push(u8::from(*b));
63 }
64 Value::Float32(f) => {
65 buffer.push(21);
66 buffer.extend_from_slice(&f.to_le_bytes());
67 }
68 Value::Float(f) => {
69 buffer.push(4);
70 buffer.extend_from_slice(&f.to_le_bytes());
71 }
72 Value::Decimal(decimal) => {
73 buffer.push(7);
74 write_decimal(&mut buffer, decimal);
75 }
76 Value::Blob(bytes) => {
77 buffer.push(8);
78 write_bytes(&mut buffer, bytes);
79 }
80 Value::Date(date) => {
81 buffer.push(9);
82 buffer.extend_from_slice(&date.days_since_epoch().to_le_bytes());
83 }
84 Value::Time(time) => {
85 buffer.push(12);
86 buffer.extend_from_slice(&time.seconds_since_midnight().to_le_bytes());
87 }
88 Value::DateTime(datetime) => {
89 buffer.push(10);
90 buffer.extend_from_slice(&datetime.seconds_since_epoch().to_le_bytes());
91 }
92 Value::TimeWithTimeZone(value) => {
93 buffer.push(14);
94 buffer.extend_from_slice(&value.seconds_since_midnight().to_le_bytes());
95 buffer.extend_from_slice(&value.offset_minutes().to_le_bytes());
96 }
97 Value::IntervalYearMonth(value) => {
98 buffer.push(15);
99 buffer.extend_from_slice(&value.total_months().to_le_bytes());
100 }
101 Value::IntervalDaySecond(value) => {
102 buffer.push(16);
103 buffer.extend_from_slice(&value.total_seconds().to_le_bytes());
104 }
105 Value::Null => buffer.push(5),
106 }
107 }
108
109 let payload_len = buffer.len() - 4;
110 buffer[0..4].copy_from_slice(&(payload_len as u32).to_le_bytes());
111 Ok(buffer)
112 }
113
114 pub fn decode_values(data: &[u8]) -> Result<Vec<Value>> {
115 let encoded = if data.len() >= 4 {
116 let payload_len = Self::read_payload_length(&data[0..4])?;
117 if payload_len + 4 == data.len() {
118 data.to_vec()
119 } else {
120 let mut encoded = Vec::with_capacity(data.len() + 4);
121 encoded.extend_from_slice(&(data.len() as u32).to_le_bytes());
122 encoded.extend_from_slice(data);
123 encoded
124 }
125 } else {
126 let mut encoded = Vec::with_capacity(data.len() + 4);
127 encoded.extend_from_slice(&(data.len() as u32).to_le_bytes());
128 encoded.extend_from_slice(data);
129 encoded
130 };
131
132 Ok(Self::decode_stored_row(&encoded)?.values)
133 }
134
135 pub fn decode_stored_row(data: &[u8]) -> Result<StoredRow> {
136 if data.len() < 12 {
137 return Err(HematiteError::CorruptedData(
138 "Stored row header is truncated".to_string(),
139 ));
140 }
141
142 let mut offset = 0usize;
143 let payload_len = Self::read_payload_length(&data[0..4])?;
144 offset += 4;
145
146 if payload_len + 4 > data.len() {
147 return Err(HematiteError::CorruptedData(
148 "Stored row length exceeds available bytes".to_string(),
149 ));
150 }
151
152 let row_id = u64::from_le_bytes(data[offset..offset + 8].try_into().map_err(|_| {
153 HematiteError::CorruptedData("Stored row rowid is truncated".to_string())
154 })?);
155 offset += 8;
156
157 let value_count = u32::from_le_bytes(data[offset..offset + 4].try_into().map_err(|_| {
158 HematiteError::CorruptedData("Stored row value count is truncated".to_string())
159 })?) as usize;
160 offset += 4;
161
162 let payload_end = payload_len + 4;
163 let mut values = Vec::with_capacity(value_count);
164
165 for _ in 0..value_count {
166 if offset >= payload_end {
167 return Err(HematiteError::CorruptedData(
168 "Stored row ended before all values were decoded".to_string(),
169 ));
170 }
171
172 let tag = data[offset];
173 offset += 1;
174 let value = match tag {
175 1 => {
176 let bytes = read_exact(data, &mut offset, payload_end, 4, "Integer value")?;
177 Value::Integer(i32::from_le_bytes(bytes.try_into().unwrap()))
178 }
179 2 => {
180 let bytes = read_bytes(data, &mut offset, payload_end, "Text value")?;
181 let text = String::from_utf8(bytes).map_err(|_| {
182 HematiteError::CorruptedData("Invalid UTF-8 in text value".to_string())
183 })?;
184 Value::Text(text)
185 }
186 11 => {
187 let bytes = read_bytes(data, &mut offset, payload_end, "Enum value")?;
188 let text = String::from_utf8(bytes).map_err(|_| {
189 HematiteError::CorruptedData("Invalid UTF-8 in enum value".to_string())
190 })?;
191 Value::Enum(text)
192 }
193 3 => {
194 let bytes = read_exact(data, &mut offset, payload_end, 1, "Boolean value")?;
195 Value::Boolean(bytes[0] != 0)
196 }
197 4 => {
198 let bytes = read_exact(data, &mut offset, payload_end, 8, "Float value")?;
199 Value::Float(f64::from_le_bytes(bytes.try_into().unwrap()))
200 }
201 21 => {
202 let bytes = read_exact(data, &mut offset, payload_end, 4, "Float32 value")?;
203 Value::Float32(f32::from_le_bytes(bytes.try_into().unwrap()))
204 }
205 5 => Value::Null,
206 6 => {
207 let bytes = read_exact(data, &mut offset, payload_end, 8, "BigInt value")?;
208 Value::BigInt(i64::from_le_bytes(bytes.try_into().unwrap()))
209 }
210 17 => {
211 let bytes = read_exact(data, &mut offset, payload_end, 16, "Int128 value")?;
212 Value::Int128(i128::from_le_bytes(bytes.try_into().unwrap()))
213 }
214 18 => {
215 let bytes = read_exact(data, &mut offset, payload_end, 4, "UInt value")?;
216 Value::UInteger(u32::from_le_bytes(bytes.try_into().unwrap()))
217 }
218 19 => {
219 let bytes = read_exact(data, &mut offset, payload_end, 8, "UInt64 value")?;
220 Value::UBigInt(u64::from_le_bytes(bytes.try_into().unwrap()))
221 }
222 20 => {
223 let bytes = read_exact(data, &mut offset, payload_end, 16, "UInt128 value")?;
224 Value::UInt128(u128::from_le_bytes(bytes.try_into().unwrap()))
225 }
226 7 => Value::Decimal(read_decimal(data, &mut offset, payload_end)?),
227 8 => Value::Blob(read_bytes(data, &mut offset, payload_end, "Blob value")?),
228 9 => {
229 let bytes = read_exact(data, &mut offset, payload_end, 4, "Date value")?;
230 Value::Date(DateValue::from_days_since_epoch(i32::from_le_bytes(
231 bytes.try_into().unwrap(),
232 )))
233 }
234 12 => {
235 let bytes = read_exact(data, &mut offset, payload_end, 4, "Time value")?;
236 Value::Time(TimeValue::from_seconds_since_midnight(u32::from_le_bytes(
237 bytes.try_into().unwrap(),
238 )))
239 }
240 10 => {
241 let bytes = read_exact(data, &mut offset, payload_end, 8, "DateTime value")?;
242 Value::DateTime(DateTimeValue::from_seconds_since_epoch(i64::from_le_bytes(
243 bytes.try_into().unwrap(),
244 )))
245 }
246 14 => {
247 let seconds = u32::from_le_bytes(
248 read_exact(
249 data,
250 &mut offset,
251 payload_end,
252 4,
253 "Time with time zone seconds",
254 )?
255 .try_into()
256 .unwrap(),
257 );
258 let offset_minutes = i16::from_le_bytes(
259 read_exact(
260 data,
261 &mut offset,
262 payload_end,
263 2,
264 "Time with time zone offset",
265 )?
266 .try_into()
267 .unwrap(),
268 );
269 Value::TimeWithTimeZone(TimeWithTimeZoneValue::from_parts(
270 seconds,
271 offset_minutes,
272 ))
273 }
274 15 => {
275 let bytes =
276 read_exact(data, &mut offset, payload_end, 4, "Interval year-month")?;
277 Value::IntervalYearMonth(crate::catalog::IntervalYearMonthValue::new(
278 i32::from_le_bytes(bytes.try_into().unwrap()),
279 ))
280 }
281 16 => {
282 let bytes =
283 read_exact(data, &mut offset, payload_end, 8, "Interval day-second")?;
284 Value::IntervalDaySecond(crate::catalog::IntervalDaySecondValue::new(
285 i64::from_le_bytes(bytes.try_into().unwrap()),
286 ))
287 }
288 _ => {
289 return Err(HematiteError::CorruptedData(format!(
290 "Unknown value tag {} in stored row",
291 tag
292 )))
293 }
294 };
295
296 values.push(value);
297 }
298
299 Ok(StoredRow { row_id, values })
300 }
301
302 pub fn read_payload_length(prefix: &[u8]) -> Result<usize> {
303 if prefix.len() != 4 {
304 return Err(HematiteError::CorruptedData(
305 "Row length prefix must be 4 bytes".to_string(),
306 ));
307 }
308
309 Ok(u32::from_le_bytes([prefix[0], prefix[1], prefix[2], prefix[3]]) as usize)
310 }
311}
312
313pub struct IndexKeyCodec;
314
315impl IndexKeyCodec {
316 pub fn encode_key(values: &[Value]) -> Result<Vec<u8>> {
317 let mut buffer = Vec::new();
318 for value in values {
319 encode_key_value(&mut buffer, value);
320 }
321 Ok(buffer)
322 }
323
324 pub fn encode_secondary_key(values: &[Value], row_id: u64) -> Result<Vec<u8>> {
325 let mut key = Self::encode_key(values)?;
326 key.extend_from_slice(&row_id.to_be_bytes());
327 Ok(key)
328 }
329
330 pub fn decode_row_id(value: &[u8]) -> Result<u64> {
331 if value.len() != 8 {
332 return Err(HematiteError::CorruptedData(
333 "Index rowid payload must be exactly 8 bytes".to_string(),
334 ));
335 }
336 let mut bytes = [0u8; 8];
337 bytes.copy_from_slice(value);
338 Ok(u64::from_be_bytes(bytes))
339 }
340
341 pub fn split_secondary_key(key: &[u8]) -> Result<(Vec<u8>, u64)> {
342 if key.len() < 8 {
343 return Err(HematiteError::CorruptedData(
344 "Index entry is missing rowid bytes".to_string(),
345 ));
346 }
347 let mut row_id_bytes = [0u8; 8];
348 row_id_bytes.copy_from_slice(&key[key.len() - 8..]);
349 let row_id = u64::from_be_bytes(row_id_bytes);
350 Ok((key[..key.len() - 8].to_vec(), row_id))
351 }
352}
353
354pub struct RowSerializer;
355
356impl RowSerializer {
357 pub fn serialize(values: &[Value]) -> Result<Vec<u8>> {
358 RowCodec::encode_values(values)
359 }
360
361 pub fn serialize_stored_row(row: &StoredRow) -> Result<Vec<u8>> {
362 RowCodec::encode_stored_row(row)
363 }
364
365 pub fn deserialize(data: &[u8]) -> Result<Vec<Value>> {
366 RowCodec::decode_values(data)
367 }
368
369 pub fn deserialize_stored_row(data: &[u8]) -> Result<StoredRow> {
370 RowCodec::decode_stored_row(data)
371 }
372
373 pub fn read_row_length(prefix: &[u8]) -> Result<usize> {
374 RowCodec::read_payload_length(prefix)
375 }
376}
377
378fn encode_key_value(buffer: &mut Vec<u8>, value: &Value) {
379 match value {
380 Value::Null => buffer.push(0),
381 Value::Boolean(false) => buffer.push(1),
382 Value::Boolean(true) => buffer.push(2),
383 Value::Integer(value) => {
384 buffer.push(3);
385 buffer.extend_from_slice(&(i32::to_be_bytes(*value ^ i32::MIN)));
386 }
387 Value::BigInt(value) => {
388 buffer.push(4);
389 buffer.extend_from_slice(&(i64::to_be_bytes(*value ^ i64::MIN)));
390 }
391 Value::Int128(value) => {
392 buffer.push(17);
393 buffer.extend_from_slice(&(i128::to_be_bytes(*value ^ i128::MIN)));
394 }
395 Value::UInteger(value) => {
396 buffer.push(18);
397 buffer.extend_from_slice(&value.to_be_bytes());
398 }
399 Value::UBigInt(value) => {
400 buffer.push(19);
401 buffer.extend_from_slice(&value.to_be_bytes());
402 }
403 Value::UInt128(value) => {
404 buffer.push(20);
405 buffer.extend_from_slice(&value.to_be_bytes());
406 }
407 Value::Float32(value) => {
408 buffer.push(21);
409 buffer.extend_from_slice(&ordered_f32_bytes(*value));
410 }
411 Value::Float(value) => {
412 buffer.push(5);
413 buffer.extend_from_slice(&ordered_f64_bytes(*value));
414 }
415 Value::Decimal(value) => {
416 buffer.push(6);
417 buffer.push(u8::from(value.negative()));
418 buffer.extend_from_slice(&value.scale().to_be_bytes());
419 buffer.extend_from_slice(&(value.digit_bytes().len() as u32).to_be_bytes());
420 write_packed_digits(buffer, value.digit_bytes());
421 }
422 Value::Text(value) => {
423 buffer.push(7);
424 write_bytes(buffer, value.as_bytes());
425 }
426 Value::Enum(value) => {
427 buffer.push(11);
428 write_bytes(buffer, value.as_bytes());
429 }
430 Value::Blob(value) => {
431 buffer.push(8);
432 write_bytes(buffer, value);
433 }
434 Value::Date(value) => {
435 buffer.push(9);
436 buffer.extend_from_slice(&(i32::to_be_bytes(value.days_since_epoch() ^ i32::MIN)));
437 }
438 Value::Time(value) => {
439 buffer.push(12);
440 buffer.extend_from_slice(&value.seconds_since_midnight().to_be_bytes());
441 }
442 Value::DateTime(value) => {
443 buffer.push(10);
444 buffer.extend_from_slice(&(i64::to_be_bytes(value.seconds_since_epoch() ^ i64::MIN)));
445 }
446 Value::TimeWithTimeZone(value) => {
447 buffer.push(14);
448 buffer.extend_from_slice(&value.seconds_since_midnight().to_be_bytes());
449 buffer.extend_from_slice(&(i16::to_be_bytes(value.offset_minutes() ^ i16::MIN)));
450 }
451 Value::IntervalYearMonth(value) => {
452 buffer.push(15);
453 buffer.extend_from_slice(&(i32::to_be_bytes(value.total_months() ^ i32::MIN)));
454 }
455 Value::IntervalDaySecond(value) => {
456 buffer.push(16);
457 buffer.extend_from_slice(&(i64::to_be_bytes(value.total_seconds() ^ i64::MIN)));
458 }
459 }
460}
461
462fn ordered_f64_bytes(value: f64) -> [u8; 8] {
463 let bits = value.to_bits();
464 let transformed = if (bits >> 63) == 0 {
465 bits ^ (1u64 << 63)
466 } else {
467 !bits
468 };
469 transformed.to_be_bytes()
470}
471
472fn ordered_f32_bytes(value: f32) -> [u8; 4] {
473 let bits = value.to_bits();
474 let transformed = if (bits >> 31) == 0 {
475 bits ^ (1u32 << 31)
476 } else {
477 !bits
478 };
479 transformed.to_be_bytes()
480}
481
482fn write_bytes(buffer: &mut Vec<u8>, bytes: &[u8]) {
483 buffer.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
484 buffer.extend_from_slice(bytes);
485}
486
487fn read_bytes(data: &[u8], offset: &mut usize, end: usize, label: &str) -> Result<Vec<u8>> {
488 let len_bytes = read_exact(data, offset, end, 4, &format!("{label} length"))?;
489 let len = u32::from_le_bytes(len_bytes.try_into().unwrap()) as usize;
490 Ok(read_exact(data, offset, end, len, label)?.to_vec())
491}
492
493fn read_exact<'a>(
494 data: &'a [u8],
495 offset: &mut usize,
496 end: usize,
497 len: usize,
498 label: &str,
499) -> Result<&'a [u8]> {
500 if *offset + len > end {
501 return Err(HematiteError::CorruptedData(format!(
502 "{} is truncated",
503 label
504 )));
505 }
506 let bytes = &data[*offset..*offset + len];
507 *offset += len;
508 Ok(bytes)
509}
510
511fn write_decimal(buffer: &mut Vec<u8>, value: &DecimalValue) {
512 buffer.push(u8::from(value.negative()));
513 buffer.extend_from_slice(&value.scale().to_le_bytes());
514 buffer.extend_from_slice(&(value.digit_bytes().len() as u32).to_le_bytes());
515 write_packed_digits(buffer, value.digit_bytes());
516}
517
518fn read_decimal(data: &[u8], offset: &mut usize, end: usize) -> Result<DecimalValue> {
519 let sign = read_exact(data, offset, end, 1, "Decimal sign")?[0] != 0;
520 let scale = u32::from_le_bytes(
521 read_exact(data, offset, end, 4, "Decimal scale")?
522 .try_into()
523 .unwrap(),
524 );
525 let digit_count = u32::from_le_bytes(
526 read_exact(data, offset, end, 4, "Decimal digit count")?
527 .try_into()
528 .unwrap(),
529 ) as usize;
530 let packed_len = digit_count.div_ceil(2);
531 let packed = read_exact(data, offset, end, packed_len, "Decimal digits")?;
532 let digits = read_packed_digits(packed, digit_count)?;
533 let mut decimal = DecimalValue::parse(&format_decimal_digits(sign, &digits, scale as usize))?;
534 if decimal.digit_bytes().len() == 1 && decimal.digit_bytes()[0] == 0 {
535 decimal = DecimalValue::zero();
536 }
537 Ok(decimal)
538}
539
540fn format_decimal_digits(negative: bool, digits: &[u8], scale: usize) -> String {
541 let mut out = String::new();
542 if negative && !(digits.len() == 1 && digits[0] == 0) {
543 out.push('-');
544 }
545 let digit_string = digits
546 .iter()
547 .map(|digit| char::from(b'0' + *digit))
548 .collect::<String>();
549 if scale == 0 {
550 out.push_str(&digit_string);
551 return out;
552 }
553 if digit_string.len() <= scale {
554 out.push_str("0.");
555 for _ in 0..scale - digit_string.len() {
556 out.push('0');
557 }
558 out.push_str(&digit_string);
559 return out;
560 }
561 let split = digit_string.len() - scale;
562 out.push_str(&digit_string[..split]);
563 out.push('.');
564 out.push_str(&digit_string[split..]);
565 out
566}
567
568fn write_packed_digits(buffer: &mut Vec<u8>, digits: &[u8]) {
569 for chunk in digits.chunks(2) {
570 let high = chunk[0] & 0x0F;
571 let low = if chunk.len() > 1 {
572 chunk[1] & 0x0F
573 } else {
574 0x0F
575 };
576 buffer.push((high << 4) | low);
577 }
578}
579
580fn read_packed_digits(bytes: &[u8], digit_count: usize) -> Result<Vec<u8>> {
581 let mut digits = Vec::with_capacity(digit_count);
582 for byte in bytes {
583 digits.push((byte >> 4) & 0x0F);
584 if digits.len() == digit_count {
585 break;
586 }
587 let low = byte & 0x0F;
588 if low <= 9 {
589 digits.push(low);
590 }
591 if digits.len() == digit_count {
592 break;
593 }
594 }
595 if digits.len() != digit_count || digits.iter().any(|digit| *digit > 9) {
596 return Err(HematiteError::CorruptedData(
597 "Packed decimal digits are invalid".to_string(),
598 ));
599 }
600 Ok(digits)
601}