reddb_server/storage/schema/
value_codec.rs1use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
36
37use super::types::{read_varint, write_varint, DataType, Value, ValueError};
38
39pub type ValueKind = DataType;
44
45#[inline]
51pub fn type_tag(value: &Value) -> u8 {
52 match value {
53 Value::Null => 0,
54 other => other.data_type().to_byte(),
55 }
56}
57
58#[inline]
61pub fn type_for_tag(tag: u8) -> Option<ValueKind> {
62 if tag == 0 {
63 Some(DataType::Nullable)
64 } else {
65 DataType::from_byte(tag)
66 }
67}
68
69pub(super) const TOAST_THRESHOLD: usize = 2048;
73
74pub(super) const TOAST_ZSTD_LEVEL: i32 = 3;
77
78pub fn encode(value: &Value, out: &mut Vec<u8>) {
83 match value {
84 Value::Null => {
85 out.push(0); }
87 Value::Integer(v) => {
88 out.push(DataType::Integer.to_byte());
89 out.extend_from_slice(&v.to_le_bytes());
90 }
91 Value::UnsignedInteger(v) => {
92 out.push(DataType::UnsignedInteger.to_byte());
93 out.extend_from_slice(&v.to_le_bytes());
94 }
95 Value::Float(v) => {
96 out.push(DataType::Float.to_byte());
97 out.extend_from_slice(&v.to_le_bytes());
98 }
99 Value::Text(s) => {
100 let bytes = s.as_bytes();
101 if bytes.len() > TOAST_THRESHOLD {
106 if let Ok(compressed) = zstd::bulk::compress(bytes, TOAST_ZSTD_LEVEL) {
107 if compressed.len() < bytes.len() {
108 out.push(DataType::TextZstd.to_byte());
109 write_varint(out, bytes.len() as u64);
111 write_varint(out, compressed.len() as u64);
112 out.extend_from_slice(&compressed);
113 return;
114 }
115 }
116 }
117 out.push(DataType::Text.to_byte());
118 write_varint(out, bytes.len() as u64);
119 out.extend_from_slice(bytes);
120 }
121 Value::Blob(data) => {
122 if data.len() > TOAST_THRESHOLD {
124 if let Ok(compressed) = zstd::bulk::compress(data, TOAST_ZSTD_LEVEL) {
125 if compressed.len() < data.len() {
126 out.push(DataType::BlobZstd.to_byte());
127 write_varint(out, data.len() as u64);
128 write_varint(out, compressed.len() as u64);
129 out.extend_from_slice(&compressed);
130 return;
131 }
132 }
133 }
134 out.push(DataType::Blob.to_byte());
135 write_varint(out, data.len() as u64);
136 out.extend_from_slice(data);
137 }
138 Value::Boolean(v) => {
139 out.push(DataType::Boolean.to_byte());
140 out.push(if *v { 1 } else { 0 });
141 }
142 Value::Timestamp(v) => {
143 out.push(DataType::Timestamp.to_byte());
144 out.extend_from_slice(&v.to_le_bytes());
145 }
146 Value::Duration(v) => {
147 out.push(DataType::Duration.to_byte());
148 out.extend_from_slice(&v.to_le_bytes());
149 }
150 Value::IpAddr(addr) => {
151 out.push(DataType::IpAddr.to_byte());
152 match addr {
153 IpAddr::V4(v4) => {
154 out.push(4); out.extend_from_slice(&v4.octets());
156 }
157 IpAddr::V6(v6) => {
158 out.push(6); out.extend_from_slice(&v6.octets());
160 }
161 }
162 }
163 Value::MacAddr(mac) => {
164 out.push(DataType::MacAddr.to_byte());
165 out.extend_from_slice(mac);
166 }
167 Value::Vector(vec) => {
168 out.push(DataType::Vector.to_byte());
169 write_varint(out, vec.len() as u64);
170 for v in vec {
171 out.extend_from_slice(&v.to_le_bytes());
172 }
173 }
174 Value::Json(data) => {
175 out.push(DataType::Json.to_byte());
176 write_varint(out, data.len() as u64);
177 out.extend_from_slice(data);
178 }
179 Value::Uuid(uuid) => {
180 out.push(DataType::Uuid.to_byte());
181 out.extend_from_slice(uuid);
182 }
183 Value::NodeRef(node_id) => {
184 out.push(DataType::NodeRef.to_byte());
185 let bytes = node_id.as_bytes();
186 write_varint(out, bytes.len() as u64);
187 out.extend_from_slice(bytes);
188 }
189 Value::EdgeRef(edge_id) => {
190 out.push(DataType::EdgeRef.to_byte());
191 let bytes = edge_id.as_bytes();
192 write_varint(out, bytes.len() as u64);
193 out.extend_from_slice(bytes);
194 }
195 Value::VectorRef(collection, vector_id) => {
196 out.push(DataType::VectorRef.to_byte());
197 let coll_bytes = collection.as_bytes();
198 write_varint(out, coll_bytes.len() as u64);
199 out.extend_from_slice(coll_bytes);
200 out.extend_from_slice(&vector_id.to_le_bytes());
201 }
202 Value::RowRef(table, row_id) => {
203 out.push(DataType::RowRef.to_byte());
204 let table_bytes = table.as_bytes();
205 write_varint(out, table_bytes.len() as u64);
206 out.extend_from_slice(table_bytes);
207 out.extend_from_slice(&row_id.to_le_bytes());
208 }
209 Value::Color(rgb) => {
210 out.push(DataType::Color.to_byte());
211 out.extend_from_slice(rgb);
212 }
213 Value::Email(s) => {
214 out.push(DataType::Email.to_byte());
215 let bytes = s.as_bytes();
216 write_varint(out, bytes.len() as u64);
217 out.extend_from_slice(bytes);
218 }
219 Value::Url(s) => {
220 out.push(DataType::Url.to_byte());
221 let bytes = s.as_bytes();
222 write_varint(out, bytes.len() as u64);
223 out.extend_from_slice(bytes);
224 }
225 Value::Phone(n) => {
226 out.push(DataType::Phone.to_byte());
227 out.extend_from_slice(&n.to_le_bytes());
228 }
229 Value::Semver(packed) => {
230 out.push(DataType::Semver.to_byte());
231 out.extend_from_slice(&packed.to_le_bytes());
232 }
233 Value::Cidr(ip, prefix) => {
234 out.push(DataType::Cidr.to_byte());
235 out.extend_from_slice(&ip.to_le_bytes());
236 out.push(*prefix);
237 }
238 Value::Date(days) => {
239 out.push(DataType::Date.to_byte());
240 out.extend_from_slice(&days.to_le_bytes());
241 }
242 Value::Time(ms) => {
243 out.push(DataType::Time.to_byte());
244 out.extend_from_slice(&ms.to_le_bytes());
245 }
246 Value::Decimal(v) => {
247 out.push(DataType::Decimal.to_byte());
248 out.extend_from_slice(&v.to_le_bytes());
249 }
250 Value::EnumValue(idx) => {
251 out.push(DataType::Enum.to_byte());
252 out.push(*idx);
253 }
254 Value::Array(elements) => {
255 out.push(DataType::Array.to_byte());
256 write_varint(out, elements.len() as u64);
257 for elem in elements {
258 encode(elem, out);
259 }
260 }
261 Value::TimestampMs(v) => {
262 out.push(DataType::TimestampMs.to_byte());
263 out.extend_from_slice(&v.to_le_bytes());
264 }
265 Value::Ipv4(v) => {
266 out.push(DataType::Ipv4.to_byte());
267 out.extend_from_slice(&v.to_le_bytes());
268 }
269 Value::Ipv6(bytes) => {
270 out.push(DataType::Ipv6.to_byte());
271 out.extend_from_slice(bytes);
272 }
273 Value::Subnet(ip, mask) => {
274 out.push(DataType::Subnet.to_byte());
275 out.extend_from_slice(&ip.to_le_bytes());
276 out.extend_from_slice(&mask.to_le_bytes());
277 }
278 Value::Port(v) => {
279 out.push(DataType::Port.to_byte());
280 out.extend_from_slice(&v.to_le_bytes());
281 }
282 Value::Latitude(v) => {
283 out.push(DataType::Latitude.to_byte());
284 out.extend_from_slice(&v.to_le_bytes());
285 }
286 Value::Longitude(v) => {
287 out.push(DataType::Longitude.to_byte());
288 out.extend_from_slice(&v.to_le_bytes());
289 }
290 Value::GeoPoint(lat, lon) => {
291 out.push(DataType::GeoPoint.to_byte());
292 out.extend_from_slice(&lat.to_le_bytes());
293 out.extend_from_slice(&lon.to_le_bytes());
294 }
295 Value::Country2(c) => {
296 out.push(DataType::Country2.to_byte());
297 out.extend_from_slice(c);
298 }
299 Value::Country3(c) => {
300 out.push(DataType::Country3.to_byte());
301 out.extend_from_slice(c);
302 }
303 Value::Lang2(c) => {
304 out.push(DataType::Lang2.to_byte());
305 out.extend_from_slice(c);
306 }
307 Value::Lang5(c) => {
308 out.push(DataType::Lang5.to_byte());
309 out.extend_from_slice(c);
310 }
311 Value::Currency(c) => {
312 out.push(DataType::Currency.to_byte());
313 out.extend_from_slice(c);
314 }
315 Value::AssetCode(code) => {
316 out.push(DataType::AssetCode.to_byte());
317 let bytes = code.as_bytes();
318 write_varint(out, bytes.len() as u64);
319 out.extend_from_slice(bytes);
320 }
321 Value::Money {
322 asset_code,
323 minor_units,
324 scale,
325 } => {
326 out.push(DataType::Money.to_byte());
327 let bytes = asset_code.as_bytes();
328 write_varint(out, bytes.len() as u64);
329 out.extend_from_slice(bytes);
330 out.push(*scale);
331 out.extend_from_slice(&minor_units.to_le_bytes());
332 }
333 Value::ColorAlpha(rgba) => {
334 out.push(DataType::ColorAlpha.to_byte());
335 out.extend_from_slice(rgba);
336 }
337 Value::BigInt(v) => {
338 out.push(DataType::BigInt.to_byte());
339 out.extend_from_slice(&v.to_le_bytes());
340 }
341 Value::KeyRef(col, key) => {
342 out.push(DataType::KeyRef.to_byte());
343 let col_bytes = col.as_bytes();
344 write_varint(out, col_bytes.len() as u64);
345 out.extend_from_slice(col_bytes);
346 let key_bytes = key.as_bytes();
347 write_varint(out, key_bytes.len() as u64);
348 out.extend_from_slice(key_bytes);
349 }
350 Value::DocRef(col, id) => {
351 out.push(DataType::DocRef.to_byte());
352 let col_bytes = col.as_bytes();
353 write_varint(out, col_bytes.len() as u64);
354 out.extend_from_slice(col_bytes);
355 out.extend_from_slice(&id.to_le_bytes());
356 }
357 Value::TableRef(name) => {
358 out.push(DataType::TableRef.to_byte());
359 let name_bytes = name.as_bytes();
360 write_varint(out, name_bytes.len() as u64);
361 out.extend_from_slice(name_bytes);
362 }
363 Value::PageRef(page_id) => {
364 out.push(DataType::PageRef.to_byte());
365 out.extend_from_slice(&page_id.to_le_bytes());
366 }
367 Value::Secret(bytes) => {
368 out.push(DataType::Secret.to_byte());
369 write_varint(out, bytes.len() as u64);
370 out.extend_from_slice(bytes);
371 }
372 Value::Password(hash) => {
373 out.push(DataType::Password.to_byte());
374 let bytes = hash.as_bytes();
375 write_varint(out, bytes.len() as u64);
376 out.extend_from_slice(bytes);
377 }
378 }
379}
380
381pub fn decode(data: &[u8]) -> Result<(Value, usize), ValueError> {
384 if data.is_empty() {
385 return Err(ValueError::EmptyData);
386 }
387
388 let type_byte = data[0];
389 let mut offset = 1;
390
391 if type_byte == 0 {
393 return Ok((Value::Null, 1));
394 }
395
396 let data_type = DataType::from_byte(type_byte).ok_or(ValueError::InvalidType(type_byte))?;
397
398 let value = match data_type {
399 DataType::Integer => {
400 if data.len() < offset + 8 {
401 return Err(ValueError::TruncatedData);
402 }
403 let v = i64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
404 offset += 8;
405 Value::Integer(v)
406 }
407 DataType::UnsignedInteger => {
408 if data.len() < offset + 8 {
409 return Err(ValueError::TruncatedData);
410 }
411 let v = u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
412 offset += 8;
413 Value::UnsignedInteger(v)
414 }
415 DataType::Float => {
416 if data.len() < offset + 8 {
417 return Err(ValueError::TruncatedData);
418 }
419 let v = f64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
420 offset += 8;
421 Value::Float(v)
422 }
423 DataType::Text => {
424 let (len, varint_size) = read_varint(&data[offset..])?;
425 offset += varint_size;
426 if data.len() < offset + len as usize {
427 return Err(ValueError::TruncatedData);
428 }
429 let s = String::from_utf8(data[offset..offset + len as usize].to_vec())
430 .map_err(|_| ValueError::InvalidUtf8)?;
431 offset += len as usize;
432 Value::text(s)
433 }
434 DataType::Blob => {
435 let (len, varint_size) = read_varint(&data[offset..])?;
436 offset += varint_size;
437 if data.len() < offset + len as usize {
438 return Err(ValueError::TruncatedData);
439 }
440 let blob = data[offset..offset + len as usize].to_vec();
441 offset += len as usize;
442 Value::Blob(blob)
443 }
444 DataType::Boolean => {
445 if data.len() < offset + 1 {
446 return Err(ValueError::TruncatedData);
447 }
448 let v = data[offset] != 0;
449 offset += 1;
450 Value::Boolean(v)
451 }
452 DataType::Timestamp => {
453 if data.len() < offset + 8 {
454 return Err(ValueError::TruncatedData);
455 }
456 let v = i64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
457 offset += 8;
458 Value::Timestamp(v)
459 }
460 DataType::Duration => {
461 if data.len() < offset + 8 {
462 return Err(ValueError::TruncatedData);
463 }
464 let v = i64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
465 offset += 8;
466 Value::Duration(v)
467 }
468 DataType::IpAddr => {
469 if data.len() < offset + 1 {
470 return Err(ValueError::TruncatedData);
471 }
472 let version = data[offset];
473 offset += 1;
474 match version {
475 4 => {
476 if data.len() < offset + 4 {
477 return Err(ValueError::TruncatedData);
478 }
479 let octets: [u8; 4] = data[offset..offset + 4].try_into().unwrap();
480 offset += 4;
481 Value::IpAddr(IpAddr::V4(Ipv4Addr::from(octets)))
482 }
483 6 => {
484 if data.len() < offset + 16 {
485 return Err(ValueError::TruncatedData);
486 }
487 let octets: [u8; 16] = data[offset..offset + 16].try_into().unwrap();
488 offset += 16;
489 Value::IpAddr(IpAddr::V6(Ipv6Addr::from(octets)))
490 }
491 _ => return Err(ValueError::InvalidIpVersion(version)),
492 }
493 }
494 DataType::MacAddr => {
495 if data.len() < offset + 6 {
496 return Err(ValueError::TruncatedData);
497 }
498 let mac: [u8; 6] = data[offset..offset + 6].try_into().unwrap();
499 offset += 6;
500 Value::MacAddr(mac)
501 }
502 DataType::Vector => {
503 let (len, varint_size) = read_varint(&data[offset..])?;
504 offset += varint_size;
505 let float_count = len as usize;
506 if data.len() < offset + float_count * 4 {
507 return Err(ValueError::TruncatedData);
508 }
509 let mut vec = Vec::with_capacity(float_count);
510 for _ in 0..float_count {
511 let v = f32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
512 offset += 4;
513 vec.push(v);
514 }
515 Value::Vector(vec)
516 }
517 DataType::Json => {
518 let (len, varint_size) = read_varint(&data[offset..])?;
519 offset += varint_size;
520 if data.len() < offset + len as usize {
521 return Err(ValueError::TruncatedData);
522 }
523 let json = data[offset..offset + len as usize].to_vec();
524 offset += len as usize;
525 Value::Json(json)
526 }
527 DataType::Uuid => {
528 if data.len() < offset + 16 {
529 return Err(ValueError::TruncatedData);
530 }
531 let uuid: [u8; 16] = data[offset..offset + 16].try_into().unwrap();
532 offset += 16;
533 Value::Uuid(uuid)
534 }
535 DataType::NodeRef => {
536 let (len, len_bytes) = read_varint(&data[offset..])?;
537 offset += len_bytes;
538 if data.len() < offset + len as usize {
539 return Err(ValueError::TruncatedData);
540 }
541 let node_id = String::from_utf8_lossy(&data[offset..offset + len as usize]).to_string();
542 offset += len as usize;
543 Value::NodeRef(node_id)
544 }
545 DataType::EdgeRef => {
546 let (len, len_bytes) = read_varint(&data[offset..])?;
547 offset += len_bytes;
548 if data.len() < offset + len as usize {
549 return Err(ValueError::TruncatedData);
550 }
551 let edge_id = String::from_utf8_lossy(&data[offset..offset + len as usize]).to_string();
552 offset += len as usize;
553 Value::EdgeRef(edge_id)
554 }
555 DataType::VectorRef => {
556 let (len, len_bytes) = read_varint(&data[offset..])?;
557 offset += len_bytes;
558 if data.len() < offset + len as usize + 8 {
559 return Err(ValueError::TruncatedData);
560 }
561 let collection =
562 String::from_utf8_lossy(&data[offset..offset + len as usize]).to_string();
563 offset += len as usize;
564 let vector_id = u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
565 offset += 8;
566 Value::VectorRef(collection, vector_id)
567 }
568 DataType::RowRef => {
569 let (len, len_bytes) = read_varint(&data[offset..])?;
570 offset += len_bytes;
571 if data.len() < offset + len as usize + 8 {
572 return Err(ValueError::TruncatedData);
573 }
574 let table = String::from_utf8_lossy(&data[offset..offset + len as usize]).to_string();
575 offset += len as usize;
576 let row_id = u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
577 offset += 8;
578 Value::RowRef(table, row_id)
579 }
580 DataType::Color => {
581 if data.len() < offset + 3 {
582 return Err(ValueError::TruncatedData);
583 }
584 let rgb: [u8; 3] = data[offset..offset + 3].try_into().unwrap();
585 offset += 3;
586 Value::Color(rgb)
587 }
588 DataType::Email => {
589 let (len, varint_size) = read_varint(&data[offset..])?;
590 offset += varint_size;
591 if data.len() < offset + len as usize {
592 return Err(ValueError::TruncatedData);
593 }
594 let s = String::from_utf8(data[offset..offset + len as usize].to_vec())
595 .map_err(|_| ValueError::InvalidUtf8)?;
596 offset += len as usize;
597 Value::Email(s)
598 }
599 DataType::Url => {
600 let (len, varint_size) = read_varint(&data[offset..])?;
601 offset += varint_size;
602 if data.len() < offset + len as usize {
603 return Err(ValueError::TruncatedData);
604 }
605 let s = String::from_utf8(data[offset..offset + len as usize].to_vec())
606 .map_err(|_| ValueError::InvalidUtf8)?;
607 offset += len as usize;
608 Value::Url(s)
609 }
610 DataType::Phone => {
611 if data.len() < offset + 8 {
612 return Err(ValueError::TruncatedData);
613 }
614 let v = u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
615 offset += 8;
616 Value::Phone(v)
617 }
618 DataType::Semver => {
619 if data.len() < offset + 4 {
620 return Err(ValueError::TruncatedData);
621 }
622 let v = u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
623 offset += 4;
624 Value::Semver(v)
625 }
626 DataType::Cidr => {
627 if data.len() < offset + 5 {
628 return Err(ValueError::TruncatedData);
629 }
630 let ip = u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
631 offset += 4;
632 let prefix = data[offset];
633 offset += 1;
634 Value::Cidr(ip, prefix)
635 }
636 DataType::Date => {
637 if data.len() < offset + 4 {
638 return Err(ValueError::TruncatedData);
639 }
640 let v = i32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
641 offset += 4;
642 Value::Date(v)
643 }
644 DataType::Time => {
645 if data.len() < offset + 4 {
646 return Err(ValueError::TruncatedData);
647 }
648 let v = u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
649 offset += 4;
650 Value::Time(v)
651 }
652 DataType::Decimal => {
653 if data.len() < offset + 8 {
654 return Err(ValueError::TruncatedData);
655 }
656 let v = i64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
657 offset += 8;
658 Value::Decimal(v)
659 }
660 DataType::Enum => {
661 if data.len() < offset + 1 {
662 return Err(ValueError::TruncatedData);
663 }
664 let idx = data[offset];
665 offset += 1;
666 Value::EnumValue(idx)
667 }
668 DataType::Array => {
669 let (len, varint_size) = read_varint(&data[offset..])?;
670 offset += varint_size;
671 let count = len as usize;
672 let mut elements = Vec::with_capacity(count);
673 for _ in 0..count {
674 let (elem, elem_size) = decode(&data[offset..])?;
675 offset += elem_size;
676 elements.push(elem);
677 }
678 Value::Array(elements)
679 }
680 DataType::TimestampMs => {
681 if data.len() < offset + 8 {
682 return Err(ValueError::TruncatedData);
683 }
684 let v = i64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
685 offset += 8;
686 Value::TimestampMs(v)
687 }
688 DataType::Ipv4 => {
689 if data.len() < offset + 4 {
690 return Err(ValueError::TruncatedData);
691 }
692 let v = u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
693 offset += 4;
694 Value::Ipv4(v)
695 }
696 DataType::Ipv6 => {
697 if data.len() < offset + 16 {
698 return Err(ValueError::TruncatedData);
699 }
700 let bytes: [u8; 16] = data[offset..offset + 16].try_into().unwrap();
701 offset += 16;
702 Value::Ipv6(bytes)
703 }
704 DataType::Subnet => {
705 if data.len() < offset + 8 {
706 return Err(ValueError::TruncatedData);
707 }
708 let ip = u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
709 offset += 4;
710 let mask = u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
711 offset += 4;
712 Value::Subnet(ip, mask)
713 }
714 DataType::Port => {
715 if data.len() < offset + 2 {
716 return Err(ValueError::TruncatedData);
717 }
718 let v = u16::from_le_bytes(data[offset..offset + 2].try_into().unwrap());
719 offset += 2;
720 Value::Port(v)
721 }
722 DataType::Latitude => {
723 if data.len() < offset + 4 {
724 return Err(ValueError::TruncatedData);
725 }
726 let v = i32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
727 offset += 4;
728 Value::Latitude(v)
729 }
730 DataType::Longitude => {
731 if data.len() < offset + 4 {
732 return Err(ValueError::TruncatedData);
733 }
734 let v = i32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
735 offset += 4;
736 Value::Longitude(v)
737 }
738 DataType::GeoPoint => {
739 if data.len() < offset + 8 {
740 return Err(ValueError::TruncatedData);
741 }
742 let lat = i32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
743 offset += 4;
744 let lon = i32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
745 offset += 4;
746 Value::GeoPoint(lat, lon)
747 }
748 DataType::Country2 => {
749 if data.len() < offset + 2 {
750 return Err(ValueError::TruncatedData);
751 }
752 let c: [u8; 2] = data[offset..offset + 2].try_into().unwrap();
753 offset += 2;
754 Value::Country2(c)
755 }
756 DataType::Country3 => {
757 if data.len() < offset + 3 {
758 return Err(ValueError::TruncatedData);
759 }
760 let c: [u8; 3] = data[offset..offset + 3].try_into().unwrap();
761 offset += 3;
762 Value::Country3(c)
763 }
764 DataType::Lang2 => {
765 if data.len() < offset + 2 {
766 return Err(ValueError::TruncatedData);
767 }
768 let c: [u8; 2] = data[offset..offset + 2].try_into().unwrap();
769 offset += 2;
770 Value::Lang2(c)
771 }
772 DataType::Lang5 => {
773 if data.len() < offset + 5 {
774 return Err(ValueError::TruncatedData);
775 }
776 let c: [u8; 5] = data[offset..offset + 5].try_into().unwrap();
777 offset += 5;
778 Value::Lang5(c)
779 }
780 DataType::Currency => {
781 if data.len() < offset + 3 {
782 return Err(ValueError::TruncatedData);
783 }
784 let c: [u8; 3] = data[offset..offset + 3].try_into().unwrap();
785 offset += 3;
786 Value::Currency(c)
787 }
788 DataType::AssetCode => {
789 let (len, len_bytes) = read_varint(&data[offset..])?;
790 offset += len_bytes;
791 if data.len() < offset + len as usize {
792 return Err(ValueError::TruncatedData);
793 }
794 let code = String::from_utf8(data[offset..offset + len as usize].to_vec())
795 .map_err(|_| ValueError::InvalidUtf8)?;
796 offset += len as usize;
797 Value::AssetCode(code)
798 }
799 DataType::Money => {
800 let (len, len_bytes) = read_varint(&data[offset..])?;
801 offset += len_bytes;
802 if data.len() < offset + len as usize + 1 + 8 {
803 return Err(ValueError::TruncatedData);
804 }
805 let asset_code = String::from_utf8(data[offset..offset + len as usize].to_vec())
806 .map_err(|_| ValueError::InvalidUtf8)?;
807 offset += len as usize;
808 let scale = data[offset];
809 offset += 1;
810 let minor_units = i64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
811 offset += 8;
812 Value::Money {
813 asset_code,
814 minor_units,
815 scale,
816 }
817 }
818 DataType::ColorAlpha => {
819 if data.len() < offset + 4 {
820 return Err(ValueError::TruncatedData);
821 }
822 let rgba: [u8; 4] = data[offset..offset + 4].try_into().unwrap();
823 offset += 4;
824 Value::ColorAlpha(rgba)
825 }
826 DataType::BigInt => {
827 if data.len() < offset + 8 {
828 return Err(ValueError::TruncatedData);
829 }
830 let v = i64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
831 offset += 8;
832 Value::BigInt(v)
833 }
834 DataType::KeyRef => {
835 let (col_len, col_varint) = read_varint(&data[offset..])?;
836 offset += col_varint;
837 if data.len() < offset + col_len as usize {
838 return Err(ValueError::TruncatedData);
839 }
840 let col = String::from_utf8(data[offset..offset + col_len as usize].to_vec())
841 .map_err(|_| ValueError::InvalidUtf8)?;
842 offset += col_len as usize;
843 let (key_len, key_varint) = read_varint(&data[offset..])?;
844 offset += key_varint;
845 if data.len() < offset + key_len as usize {
846 return Err(ValueError::TruncatedData);
847 }
848 let key = String::from_utf8(data[offset..offset + key_len as usize].to_vec())
849 .map_err(|_| ValueError::InvalidUtf8)?;
850 offset += key_len as usize;
851 Value::KeyRef(col, key)
852 }
853 DataType::DocRef => {
854 let (col_len, col_varint) = read_varint(&data[offset..])?;
855 offset += col_varint;
856 if data.len() < offset + col_len as usize + 8 {
857 return Err(ValueError::TruncatedData);
858 }
859 let col = String::from_utf8(data[offset..offset + col_len as usize].to_vec())
860 .map_err(|_| ValueError::InvalidUtf8)?;
861 offset += col_len as usize;
862 let id = u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
863 offset += 8;
864 Value::DocRef(col, id)
865 }
866 DataType::TableRef => {
867 let (len, varint_size) = read_varint(&data[offset..])?;
868 offset += varint_size;
869 if data.len() < offset + len as usize {
870 return Err(ValueError::TruncatedData);
871 }
872 let name = String::from_utf8(data[offset..offset + len as usize].to_vec())
873 .map_err(|_| ValueError::InvalidUtf8)?;
874 offset += len as usize;
875 Value::TableRef(name)
876 }
877 DataType::PageRef => {
878 if data.len() < offset + 4 {
879 return Err(ValueError::TruncatedData);
880 }
881 let page_id = u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
882 offset += 4;
883 Value::PageRef(page_id)
884 }
885 DataType::Secret => {
886 let (len, varint_size) = read_varint(&data[offset..])?;
887 offset += varint_size;
888 if data.len() < offset + len as usize {
889 return Err(ValueError::TruncatedData);
890 }
891 let bytes = data[offset..offset + len as usize].to_vec();
892 offset += len as usize;
893 Value::Secret(bytes)
894 }
895 DataType::Password => {
896 let (len, varint_size) = read_varint(&data[offset..])?;
897 offset += varint_size;
898 if data.len() < offset + len as usize {
899 return Err(ValueError::TruncatedData);
900 }
901 let hash = String::from_utf8(data[offset..offset + len as usize].to_vec())
902 .map_err(|_| ValueError::InvalidUtf8)?;
903 offset += len as usize;
904 Value::Password(hash)
905 }
906 DataType::Nullable => {
907 Value::Null
909 }
910 DataType::Unknown => {
911 Value::Null
915 }
916 DataType::TextZstd => {
920 let (orig_len, vs1) = read_varint(&data[offset..])?;
921 offset += vs1;
922 let (comp_len, vs2) = read_varint(&data[offset..])?;
923 offset += vs2;
924 if data.len() < offset + comp_len as usize {
925 return Err(ValueError::TruncatedData);
926 }
927 let compressed = &data[offset..offset + comp_len as usize];
928 let mut decompressed = vec![0u8; orig_len as usize];
929 zstd::bulk::decompress_to_buffer(compressed, &mut decompressed)
930 .map_err(|_| ValueError::InvalidUtf8)?;
931 offset += comp_len as usize;
932 let s = String::from_utf8(decompressed).map_err(|_| ValueError::InvalidUtf8)?;
933 Value::text(s)
934 }
935 DataType::BlobZstd => {
937 let (orig_len, vs1) = read_varint(&data[offset..])?;
938 offset += vs1;
939 let (comp_len, vs2) = read_varint(&data[offset..])?;
940 offset += vs2;
941 if data.len() < offset + comp_len as usize {
942 return Err(ValueError::TruncatedData);
943 }
944 let compressed = &data[offset..offset + comp_len as usize];
945 let mut decompressed = vec![0u8; orig_len as usize];
946 zstd::bulk::decompress_to_buffer(compressed, &mut decompressed)
947 .map_err(|_| ValueError::InvalidUtf8)?;
948 offset += comp_len as usize;
949 Value::Blob(decompressed)
950 }
951 };
952
953 Ok((value, offset))
954}
955
956#[cfg(test)]
957mod tests {
958 use super::*;
959
960 #[test]
969 fn pinned_bytes() {
970 let mut buf = Vec::new();
972 encode(&Value::Null, &mut buf);
973 assert_eq!(buf, vec![0x00], "Value::Null layout drifted");
974
975 let mut buf = Vec::new();
977 encode(&Value::Integer(-1), &mut buf);
978 assert_eq!(
979 buf,
980 vec![0x01, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF],
981 "Value::Integer layout drifted"
982 );
983
984 let mut buf = Vec::new();
986 encode(&Value::text("hi"), &mut buf);
987 assert_eq!(
988 buf,
989 vec![0x04, 0x02, b'h', b'i'],
990 "Value::Text layout drifted"
991 );
992
993 let mut buf = Vec::new();
995 encode(&Value::Boolean(true), &mut buf);
996 assert_eq!(buf, vec![0x06, 0x01], "Value::Boolean layout drifted");
997
998 let mut buf = Vec::new();
1000 encode(&Value::Blob(vec![0x01, 0x02, 0x03]), &mut buf);
1001 assert_eq!(
1002 buf,
1003 vec![0x05, 0x03, 0x01, 0x02, 0x03],
1004 "Value::Blob layout drifted"
1005 );
1006 }
1007
1008 #[test]
1012 fn type_tag_matches_data_type_byte() {
1013 let samples: &[Value] = &[
1014 Value::Null,
1015 Value::Integer(0),
1016 Value::UnsignedInteger(0),
1017 Value::Float(0.0),
1018 Value::text(""),
1019 Value::Blob(Vec::new()),
1020 Value::Boolean(false),
1021 Value::Timestamp(0),
1022 Value::Duration(0),
1023 Value::Uuid([0; 16]),
1024 ];
1025 for v in samples {
1026 let tag = type_tag(v);
1027 if matches!(v, Value::Null) {
1028 assert_eq!(tag, 0);
1029 } else {
1030 assert_eq!(tag, v.data_type().to_byte());
1031 let kind = type_for_tag(tag).expect("registered tag");
1032 assert_eq!(kind, v.data_type());
1033 }
1034 }
1035 }
1036
1037 #[test]
1041 fn rejects_unknown_type_tag() {
1042 let buf = [0xFFu8];
1044 let err = decode(&buf).expect_err("unknown tag must error");
1045 assert!(matches!(err, ValueError::InvalidType(0xFF)));
1046 }
1047
1048 #[test]
1052 fn rejects_truncated_buffer() {
1053 assert!(matches!(decode(&[]), Err(ValueError::EmptyData)));
1055
1056 let mut buf = vec![DataType::Integer.to_byte()];
1058 buf.extend_from_slice(&[0x01, 0x02, 0x03]);
1059 assert!(matches!(decode(&buf), Err(ValueError::TruncatedData)));
1060
1061 let mut buf = vec![DataType::Text.to_byte()];
1063 write_varint(&mut buf, 5);
1064 buf.extend_from_slice(b"ab");
1065 assert!(matches!(decode(&buf), Err(ValueError::TruncatedData)));
1066 }
1067
1068 #[test]
1071 fn round_trip_canonical_variants() {
1072 let cases = vec![
1073 Value::Null,
1074 Value::Integer(-12345),
1075 Value::text("hello"),
1076 Value::Boolean(true),
1077 Value::Blob(vec![1, 2, 3, 4, 5]),
1078 ];
1079 for original in cases {
1080 let mut bytes = Vec::new();
1081 encode(&original, &mut bytes);
1082 let (recovered, consumed) = decode(&bytes).expect("decode");
1083 assert_eq!(consumed, bytes.len());
1084 assert_eq!(original, recovered);
1085 }
1086 }
1087}