1use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
36
37use super::types::{read_varint, write_varint, DataType, Value, ValueError};
38
39pub type ValueKind = DataType;
44
45#[inline]
51pub fn type_tag(value: &Value) -> u8 {
52 match value {
53 Value::Null => 0,
54 other => other.data_type().to_byte(),
55 }
56}
57
58#[inline]
61pub fn type_for_tag(tag: u8) -> Option<ValueKind> {
62 if tag == 0 {
63 Some(DataType::Nullable)
64 } else {
65 DataType::from_byte(tag)
66 }
67}
68
69pub(super) const TOAST_THRESHOLD: usize = 2048;
73
74pub(super) const TOAST_ZSTD_LEVEL: i32 = 3;
77
78pub fn encode(value: &Value, out: &mut Vec<u8>) {
83 match value {
84 Value::Null => {
85 out.push(0); }
87 Value::Integer(v) => {
88 out.push(DataType::Integer.to_byte());
89 out.extend_from_slice(&v.to_le_bytes());
90 }
91 Value::UnsignedInteger(v) => {
92 out.push(DataType::UnsignedInteger.to_byte());
93 out.extend_from_slice(&v.to_le_bytes());
94 }
95 Value::Float(v) => {
96 out.push(DataType::Float.to_byte());
97 out.extend_from_slice(&v.to_le_bytes());
98 }
99 Value::Text(s) => {
100 let bytes = s.as_bytes();
101 if bytes.len() > TOAST_THRESHOLD {
106 if let Ok(compressed) = zstd::bulk::compress(bytes, TOAST_ZSTD_LEVEL) {
107 if compressed.len() < bytes.len() {
108 out.push(DataType::TextZstd.to_byte());
109 write_varint(out, bytes.len() as u64);
111 write_varint(out, compressed.len() as u64);
112 out.extend_from_slice(&compressed);
113 return;
114 }
115 }
116 }
117 out.push(DataType::Text.to_byte());
118 write_varint(out, bytes.len() as u64);
119 out.extend_from_slice(bytes);
120 }
121 Value::Blob(data) => {
122 if data.len() > TOAST_THRESHOLD {
124 if let Ok(compressed) = zstd::bulk::compress(data, TOAST_ZSTD_LEVEL) {
125 if compressed.len() < data.len() {
126 out.push(DataType::BlobZstd.to_byte());
127 write_varint(out, data.len() as u64);
128 write_varint(out, compressed.len() as u64);
129 out.extend_from_slice(&compressed);
130 return;
131 }
132 }
133 }
134 out.push(DataType::Blob.to_byte());
135 write_varint(out, data.len() as u64);
136 out.extend_from_slice(data);
137 }
138 Value::Boolean(v) => {
139 out.push(DataType::Boolean.to_byte());
140 out.push(if *v { 1 } else { 0 });
141 }
142 Value::Timestamp(v) => {
143 out.push(DataType::Timestamp.to_byte());
144 out.extend_from_slice(&v.to_le_bytes());
145 }
146 Value::Duration(v) => {
147 out.push(DataType::Duration.to_byte());
148 out.extend_from_slice(&v.to_le_bytes());
149 }
150 Value::IpAddr(addr) => {
151 out.push(DataType::IpAddr.to_byte());
152 match addr {
153 IpAddr::V4(v4) => {
154 out.push(4); out.extend_from_slice(&v4.octets());
156 }
157 IpAddr::V6(v6) => {
158 out.push(6); out.extend_from_slice(&v6.octets());
160 }
161 }
162 }
163 Value::MacAddr(mac) => {
164 out.push(DataType::MacAddr.to_byte());
165 out.extend_from_slice(mac);
166 }
167 Value::Vector(vec) => {
168 out.push(DataType::Vector.to_byte());
169 write_varint(out, vec.len() as u64);
170 for v in vec {
171 out.extend_from_slice(&v.to_le_bytes());
172 }
173 }
174 Value::Json(data) => {
175 out.push(DataType::Json.to_byte());
176 write_varint(out, data.len() as u64);
177 out.extend_from_slice(data);
178 }
179 Value::Uuid(uuid) => {
180 out.push(DataType::Uuid.to_byte());
181 out.extend_from_slice(uuid);
182 }
183 Value::NodeRef(node_id) => {
184 out.push(DataType::NodeRef.to_byte());
185 let bytes = node_id.as_bytes();
186 write_varint(out, bytes.len() as u64);
187 out.extend_from_slice(bytes);
188 }
189 Value::EdgeRef(edge_id) => {
190 out.push(DataType::EdgeRef.to_byte());
191 let bytes = edge_id.as_bytes();
192 write_varint(out, bytes.len() as u64);
193 out.extend_from_slice(bytes);
194 }
195 Value::VectorRef(collection, vector_id) => {
196 out.push(DataType::VectorRef.to_byte());
197 let coll_bytes = collection.as_bytes();
198 write_varint(out, coll_bytes.len() as u64);
199 out.extend_from_slice(coll_bytes);
200 out.extend_from_slice(&vector_id.to_le_bytes());
201 }
202 Value::RowRef(table, row_id) => {
203 out.push(DataType::RowRef.to_byte());
204 let table_bytes = table.as_bytes();
205 write_varint(out, table_bytes.len() as u64);
206 out.extend_from_slice(table_bytes);
207 out.extend_from_slice(&row_id.to_le_bytes());
208 }
209 Value::Color(rgb) => {
210 out.push(DataType::Color.to_byte());
211 out.extend_from_slice(rgb);
212 }
213 Value::Email(s) => {
214 out.push(DataType::Email.to_byte());
215 let bytes = s.as_bytes();
216 write_varint(out, bytes.len() as u64);
217 out.extend_from_slice(bytes);
218 }
219 Value::Url(s) => {
220 out.push(DataType::Url.to_byte());
221 let bytes = s.as_bytes();
222 write_varint(out, bytes.len() as u64);
223 out.extend_from_slice(bytes);
224 }
225 Value::Phone(n) => {
226 out.push(DataType::Phone.to_byte());
227 out.extend_from_slice(&n.to_le_bytes());
228 }
229 Value::Semver(packed) => {
230 out.push(DataType::Semver.to_byte());
231 out.extend_from_slice(&packed.to_le_bytes());
232 }
233 Value::Cidr(ip, prefix) => {
234 out.push(DataType::Cidr.to_byte());
235 out.extend_from_slice(&ip.to_le_bytes());
236 out.push(*prefix);
237 }
238 Value::Date(days) => {
239 out.push(DataType::Date.to_byte());
240 out.extend_from_slice(&days.to_le_bytes());
241 }
242 Value::Time(ms) => {
243 out.push(DataType::Time.to_byte());
244 out.extend_from_slice(&ms.to_le_bytes());
245 }
246 Value::Decimal(v) => {
247 out.push(DataType::Decimal.to_byte());
248 out.extend_from_slice(&v.to_le_bytes());
249 }
250 Value::EnumValue(idx) => {
251 out.push(DataType::Enum.to_byte());
252 out.push(*idx);
253 }
254 Value::Array(elements) => {
255 out.push(DataType::Array.to_byte());
256 write_varint(out, elements.len() as u64);
257 for elem in elements {
258 encode(elem, out);
259 }
260 }
261 Value::TimestampMs(v) => {
262 out.push(DataType::TimestampMs.to_byte());
263 out.extend_from_slice(&v.to_le_bytes());
264 }
265 Value::Ipv4(v) => {
266 out.push(DataType::Ipv4.to_byte());
267 out.extend_from_slice(&v.to_le_bytes());
268 }
269 Value::Ipv6(bytes) => {
270 out.push(DataType::Ipv6.to_byte());
271 out.extend_from_slice(bytes);
272 }
273 Value::Subnet(ip, mask) => {
274 out.push(DataType::Subnet.to_byte());
275 out.extend_from_slice(&ip.to_le_bytes());
276 out.extend_from_slice(&mask.to_le_bytes());
277 }
278 Value::Port(v) => {
279 out.push(DataType::Port.to_byte());
280 out.extend_from_slice(&v.to_le_bytes());
281 }
282 Value::Latitude(v) => {
283 out.push(DataType::Latitude.to_byte());
284 out.extend_from_slice(&v.to_le_bytes());
285 }
286 Value::Longitude(v) => {
287 out.push(DataType::Longitude.to_byte());
288 out.extend_from_slice(&v.to_le_bytes());
289 }
290 Value::GeoPoint(lat, lon) => {
291 out.push(DataType::GeoPoint.to_byte());
292 out.extend_from_slice(&lat.to_le_bytes());
293 out.extend_from_slice(&lon.to_le_bytes());
294 }
295 Value::Country2(c) => {
296 out.push(DataType::Country2.to_byte());
297 out.extend_from_slice(c);
298 }
299 Value::Country3(c) => {
300 out.push(DataType::Country3.to_byte());
301 out.extend_from_slice(c);
302 }
303 Value::Lang2(c) => {
304 out.push(DataType::Lang2.to_byte());
305 out.extend_from_slice(c);
306 }
307 Value::Lang5(c) => {
308 out.push(DataType::Lang5.to_byte());
309 out.extend_from_slice(c);
310 }
311 Value::Currency(c) => {
312 out.push(DataType::Currency.to_byte());
313 out.extend_from_slice(c);
314 }
315 Value::AssetCode(code) => {
316 out.push(DataType::AssetCode.to_byte());
317 let bytes = code.as_bytes();
318 write_varint(out, bytes.len() as u64);
319 out.extend_from_slice(bytes);
320 }
321 Value::Money {
322 asset_code,
323 minor_units,
324 scale,
325 } => {
326 out.push(DataType::Money.to_byte());
327 let bytes = asset_code.as_bytes();
328 write_varint(out, bytes.len() as u64);
329 out.extend_from_slice(bytes);
330 out.push(*scale);
331 out.extend_from_slice(&minor_units.to_le_bytes());
332 }
333 Value::ColorAlpha(rgba) => {
334 out.push(DataType::ColorAlpha.to_byte());
335 out.extend_from_slice(rgba);
336 }
337 Value::BigInt(v) => {
338 out.push(DataType::BigInt.to_byte());
339 out.extend_from_slice(&v.to_le_bytes());
340 }
341 Value::KeyRef(col, key) => {
342 out.push(DataType::KeyRef.to_byte());
343 let col_bytes = col.as_bytes();
344 write_varint(out, col_bytes.len() as u64);
345 out.extend_from_slice(col_bytes);
346 let key_bytes = key.as_bytes();
347 write_varint(out, key_bytes.len() as u64);
348 out.extend_from_slice(key_bytes);
349 }
350 Value::DocRef(col, id) => {
351 out.push(DataType::DocRef.to_byte());
352 let col_bytes = col.as_bytes();
353 write_varint(out, col_bytes.len() as u64);
354 out.extend_from_slice(col_bytes);
355 out.extend_from_slice(&id.to_le_bytes());
356 }
357 Value::TableRef(name) => {
358 out.push(DataType::TableRef.to_byte());
359 let name_bytes = name.as_bytes();
360 write_varint(out, name_bytes.len() as u64);
361 out.extend_from_slice(name_bytes);
362 }
363 Value::PageRef(page_id) => {
364 out.push(DataType::PageRef.to_byte());
365 out.extend_from_slice(&page_id.to_le_bytes());
366 }
367 Value::Secret(bytes) => {
368 out.push(DataType::Secret.to_byte());
369 write_varint(out, bytes.len() as u64);
370 out.extend_from_slice(bytes);
371 }
372 Value::Password(hash) => {
373 out.push(DataType::Password.to_byte());
374 let bytes = hash.as_bytes();
375 write_varint(out, bytes.len() as u64);
376 out.extend_from_slice(bytes);
377 }
378 }
379}
380
381pub fn decode(data: &[u8]) -> Result<(Value, usize), ValueError> {
384 if data.is_empty() {
385 return Err(ValueError::EmptyData);
386 }
387
388 let type_byte = data[0];
389 let mut offset = 1;
390
391 if type_byte == 0 {
393 return Ok((Value::Null, 1));
394 }
395
396 let data_type = DataType::from_byte(type_byte).ok_or(ValueError::InvalidType(type_byte))?;
397
398 let value = match data_type {
399 DataType::Integer => {
400 if data.len() < offset + 8 {
401 return Err(ValueError::TruncatedData);
402 }
403 let v = i64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
404 offset += 8;
405 Value::Integer(v)
406 }
407 DataType::UnsignedInteger => {
408 if data.len() < offset + 8 {
409 return Err(ValueError::TruncatedData);
410 }
411 let v = u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
412 offset += 8;
413 Value::UnsignedInteger(v)
414 }
415 DataType::Float => {
416 if data.len() < offset + 8 {
417 return Err(ValueError::TruncatedData);
418 }
419 let v = f64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
420 offset += 8;
421 Value::Float(v)
422 }
423 DataType::Text => {
424 let (len, varint_size) = read_varint(&data[offset..])?;
425 offset += varint_size;
426 if data.len() < offset + len as usize {
427 return Err(ValueError::TruncatedData);
428 }
429 let s = String::from_utf8(data[offset..offset + len as usize].to_vec())
430 .map_err(|_| ValueError::InvalidUtf8)?;
431 offset += len as usize;
432 Value::text(s)
433 }
434 DataType::Blob => {
435 let (len, varint_size) = read_varint(&data[offset..])?;
436 offset += varint_size;
437 if data.len() < offset + len as usize {
438 return Err(ValueError::TruncatedData);
439 }
440 let blob = data[offset..offset + len as usize].to_vec();
441 offset += len as usize;
442 Value::Blob(blob)
443 }
444 DataType::Boolean => {
445 if data.len() < offset + 1 {
446 return Err(ValueError::TruncatedData);
447 }
448 let v = data[offset] != 0;
449 offset += 1;
450 Value::Boolean(v)
451 }
452 DataType::Timestamp => {
453 if data.len() < offset + 8 {
454 return Err(ValueError::TruncatedData);
455 }
456 let v = i64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
457 offset += 8;
458 Value::Timestamp(v)
459 }
460 DataType::Duration => {
461 if data.len() < offset + 8 {
462 return Err(ValueError::TruncatedData);
463 }
464 let v = i64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
465 offset += 8;
466 Value::Duration(v)
467 }
468 DataType::IpAddr => {
469 if data.len() < offset + 1 {
470 return Err(ValueError::TruncatedData);
471 }
472 let version = data[offset];
473 offset += 1;
474 match version {
475 4 => {
476 if data.len() < offset + 4 {
477 return Err(ValueError::TruncatedData);
478 }
479 let octets: [u8; 4] = data[offset..offset + 4].try_into().unwrap();
480 offset += 4;
481 Value::IpAddr(IpAddr::V4(Ipv4Addr::from(octets)))
482 }
483 6 => {
484 if data.len() < offset + 16 {
485 return Err(ValueError::TruncatedData);
486 }
487 let octets: [u8; 16] = data[offset..offset + 16].try_into().unwrap();
488 offset += 16;
489 Value::IpAddr(IpAddr::V6(Ipv6Addr::from(octets)))
490 }
491 _ => return Err(ValueError::InvalidIpVersion(version)),
492 }
493 }
494 DataType::MacAddr => {
495 if data.len() < offset + 6 {
496 return Err(ValueError::TruncatedData);
497 }
498 let mac: [u8; 6] = data[offset..offset + 6].try_into().unwrap();
499 offset += 6;
500 Value::MacAddr(mac)
501 }
502 DataType::Vector => {
503 let (len, varint_size) = read_varint(&data[offset..])?;
504 offset += varint_size;
505 let float_count = len as usize;
506 if data.len() < offset + float_count * 4 {
507 return Err(ValueError::TruncatedData);
508 }
509 let mut vec = Vec::with_capacity(float_count);
510 for _ in 0..float_count {
511 let v = f32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
512 offset += 4;
513 vec.push(v);
514 }
515 Value::Vector(vec)
516 }
517 DataType::Json => {
518 let (len, varint_size) = read_varint(&data[offset..])?;
519 offset += varint_size;
520 if data.len() < offset + len as usize {
521 return Err(ValueError::TruncatedData);
522 }
523 let json = data[offset..offset + len as usize].to_vec();
524 offset += len as usize;
525 Value::Json(json)
526 }
527 DataType::Uuid => {
528 if data.len() < offset + 16 {
529 return Err(ValueError::TruncatedData);
530 }
531 let uuid: [u8; 16] = data[offset..offset + 16].try_into().unwrap();
532 offset += 16;
533 Value::Uuid(uuid)
534 }
535 DataType::NodeRef => {
536 let (len, len_bytes) = read_varint(&data[offset..])?;
537 offset += len_bytes;
538 if data.len() < offset + len as usize {
539 return Err(ValueError::TruncatedData);
540 }
541 let node_id = String::from_utf8_lossy(&data[offset..offset + len as usize]).to_string();
542 offset += len as usize;
543 Value::NodeRef(node_id)
544 }
545 DataType::EdgeRef => {
546 let (len, len_bytes) = read_varint(&data[offset..])?;
547 offset += len_bytes;
548 if data.len() < offset + len as usize {
549 return Err(ValueError::TruncatedData);
550 }
551 let edge_id = String::from_utf8_lossy(&data[offset..offset + len as usize]).to_string();
552 offset += len as usize;
553 Value::EdgeRef(edge_id)
554 }
555 DataType::VectorRef => {
556 let (len, len_bytes) = read_varint(&data[offset..])?;
557 offset += len_bytes;
558 if data.len() < offset + len as usize + 8 {
559 return Err(ValueError::TruncatedData);
560 }
561 let collection =
562 String::from_utf8_lossy(&data[offset..offset + len as usize]).to_string();
563 offset += len as usize;
564 let vector_id = u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
565 offset += 8;
566 Value::VectorRef(collection, vector_id)
567 }
568 DataType::RowRef => {
569 let (len, len_bytes) = read_varint(&data[offset..])?;
570 offset += len_bytes;
571 if data.len() < offset + len as usize + 8 {
572 return Err(ValueError::TruncatedData);
573 }
574 let table = String::from_utf8_lossy(&data[offset..offset + len as usize]).to_string();
575 offset += len as usize;
576 let row_id = u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
577 offset += 8;
578 Value::RowRef(table, row_id)
579 }
580 DataType::Color => {
581 if data.len() < offset + 3 {
582 return Err(ValueError::TruncatedData);
583 }
584 let rgb: [u8; 3] = data[offset..offset + 3].try_into().unwrap();
585 offset += 3;
586 Value::Color(rgb)
587 }
588 DataType::Email => {
589 let (len, varint_size) = read_varint(&data[offset..])?;
590 offset += varint_size;
591 if data.len() < offset + len as usize {
592 return Err(ValueError::TruncatedData);
593 }
594 let s = String::from_utf8(data[offset..offset + len as usize].to_vec())
595 .map_err(|_| ValueError::InvalidUtf8)?;
596 offset += len as usize;
597 Value::Email(s)
598 }
599 DataType::Url => {
600 let (len, varint_size) = read_varint(&data[offset..])?;
601 offset += varint_size;
602 if data.len() < offset + len as usize {
603 return Err(ValueError::TruncatedData);
604 }
605 let s = String::from_utf8(data[offset..offset + len as usize].to_vec())
606 .map_err(|_| ValueError::InvalidUtf8)?;
607 offset += len as usize;
608 Value::Url(s)
609 }
610 DataType::Phone => {
611 if data.len() < offset + 8 {
612 return Err(ValueError::TruncatedData);
613 }
614 let v = u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
615 offset += 8;
616 Value::Phone(v)
617 }
618 DataType::Semver => {
619 if data.len() < offset + 4 {
620 return Err(ValueError::TruncatedData);
621 }
622 let v = u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
623 offset += 4;
624 Value::Semver(v)
625 }
626 DataType::Cidr => {
627 if data.len() < offset + 5 {
628 return Err(ValueError::TruncatedData);
629 }
630 let ip = u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
631 offset += 4;
632 let prefix = data[offset];
633 offset += 1;
634 Value::Cidr(ip, prefix)
635 }
636 DataType::Date => {
637 if data.len() < offset + 4 {
638 return Err(ValueError::TruncatedData);
639 }
640 let v = i32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
641 offset += 4;
642 Value::Date(v)
643 }
644 DataType::Time => {
645 if data.len() < offset + 4 {
646 return Err(ValueError::TruncatedData);
647 }
648 let v = u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
649 offset += 4;
650 Value::Time(v)
651 }
652 DataType::Decimal => {
653 if data.len() < offset + 8 {
654 return Err(ValueError::TruncatedData);
655 }
656 let v = i64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
657 offset += 8;
658 Value::Decimal(v)
659 }
660 DataType::Enum => {
661 if data.len() < offset + 1 {
662 return Err(ValueError::TruncatedData);
663 }
664 let idx = data[offset];
665 offset += 1;
666 Value::EnumValue(idx)
667 }
668 DataType::Array => {
669 let (len, varint_size) = read_varint(&data[offset..])?;
670 offset += varint_size;
671 let count = len as usize;
672 let mut elements = Vec::with_capacity(count);
673 for _ in 0..count {
674 let (elem, elem_size) = decode(&data[offset..])?;
675 offset += elem_size;
676 elements.push(elem);
677 }
678 Value::Array(elements)
679 }
680 DataType::TimestampMs => {
681 if data.len() < offset + 8 {
682 return Err(ValueError::TruncatedData);
683 }
684 let v = i64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
685 offset += 8;
686 Value::TimestampMs(v)
687 }
688 DataType::Ipv4 => {
689 if data.len() < offset + 4 {
690 return Err(ValueError::TruncatedData);
691 }
692 let v = u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
693 offset += 4;
694 Value::Ipv4(v)
695 }
696 DataType::Ipv6 => {
697 if data.len() < offset + 16 {
698 return Err(ValueError::TruncatedData);
699 }
700 let bytes: [u8; 16] = data[offset..offset + 16].try_into().unwrap();
701 offset += 16;
702 Value::Ipv6(bytes)
703 }
704 DataType::Subnet => {
705 if data.len() < offset + 8 {
706 return Err(ValueError::TruncatedData);
707 }
708 let ip = u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
709 offset += 4;
710 let mask = u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
711 offset += 4;
712 Value::Subnet(ip, mask)
713 }
714 DataType::Port => {
715 if data.len() < offset + 2 {
716 return Err(ValueError::TruncatedData);
717 }
718 let v = u16::from_le_bytes(data[offset..offset + 2].try_into().unwrap());
719 offset += 2;
720 Value::Port(v)
721 }
722 DataType::Latitude => {
723 if data.len() < offset + 4 {
724 return Err(ValueError::TruncatedData);
725 }
726 let v = i32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
727 offset += 4;
728 Value::Latitude(v)
729 }
730 DataType::Longitude => {
731 if data.len() < offset + 4 {
732 return Err(ValueError::TruncatedData);
733 }
734 let v = i32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
735 offset += 4;
736 Value::Longitude(v)
737 }
738 DataType::GeoPoint => {
739 if data.len() < offset + 8 {
740 return Err(ValueError::TruncatedData);
741 }
742 let lat = i32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
743 offset += 4;
744 let lon = i32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
745 offset += 4;
746 Value::GeoPoint(lat, lon)
747 }
748 DataType::Country2 => {
749 if data.len() < offset + 2 {
750 return Err(ValueError::TruncatedData);
751 }
752 let c: [u8; 2] = data[offset..offset + 2].try_into().unwrap();
753 offset += 2;
754 Value::Country2(c)
755 }
756 DataType::Country3 => {
757 if data.len() < offset + 3 {
758 return Err(ValueError::TruncatedData);
759 }
760 let c: [u8; 3] = data[offset..offset + 3].try_into().unwrap();
761 offset += 3;
762 Value::Country3(c)
763 }
764 DataType::Lang2 => {
765 if data.len() < offset + 2 {
766 return Err(ValueError::TruncatedData);
767 }
768 let c: [u8; 2] = data[offset..offset + 2].try_into().unwrap();
769 offset += 2;
770 Value::Lang2(c)
771 }
772 DataType::Lang5 => {
773 if data.len() < offset + 5 {
774 return Err(ValueError::TruncatedData);
775 }
776 let c: [u8; 5] = data[offset..offset + 5].try_into().unwrap();
777 offset += 5;
778 Value::Lang5(c)
779 }
780 DataType::Currency => {
781 if data.len() < offset + 3 {
782 return Err(ValueError::TruncatedData);
783 }
784 let c: [u8; 3] = data[offset..offset + 3].try_into().unwrap();
785 offset += 3;
786 Value::Currency(c)
787 }
788 DataType::AssetCode => {
789 let (len, len_bytes) = read_varint(&data[offset..])?;
790 offset += len_bytes;
791 if data.len() < offset + len as usize {
792 return Err(ValueError::TruncatedData);
793 }
794 let code = String::from_utf8(data[offset..offset + len as usize].to_vec())
795 .map_err(|_| ValueError::InvalidUtf8)?;
796 offset += len as usize;
797 Value::AssetCode(code)
798 }
799 DataType::Money => {
800 let (len, len_bytes) = read_varint(&data[offset..])?;
801 offset += len_bytes;
802 if data.len() < offset + len as usize + 1 + 8 {
803 return Err(ValueError::TruncatedData);
804 }
805 let asset_code = String::from_utf8(data[offset..offset + len as usize].to_vec())
806 .map_err(|_| ValueError::InvalidUtf8)?;
807 offset += len as usize;
808 let scale = data[offset];
809 offset += 1;
810 let minor_units = i64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
811 offset += 8;
812 Value::Money {
813 asset_code,
814 minor_units,
815 scale,
816 }
817 }
818 DataType::ColorAlpha => {
819 if data.len() < offset + 4 {
820 return Err(ValueError::TruncatedData);
821 }
822 let rgba: [u8; 4] = data[offset..offset + 4].try_into().unwrap();
823 offset += 4;
824 Value::ColorAlpha(rgba)
825 }
826 DataType::BigInt => {
827 if data.len() < offset + 8 {
828 return Err(ValueError::TruncatedData);
829 }
830 let v = i64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
831 offset += 8;
832 Value::BigInt(v)
833 }
834 DataType::KeyRef => {
835 let (col_len, col_varint) = read_varint(&data[offset..])?;
836 offset += col_varint;
837 if data.len() < offset + col_len as usize {
838 return Err(ValueError::TruncatedData);
839 }
840 let col = String::from_utf8(data[offset..offset + col_len as usize].to_vec())
841 .map_err(|_| ValueError::InvalidUtf8)?;
842 offset += col_len as usize;
843 let (key_len, key_varint) = read_varint(&data[offset..])?;
844 offset += key_varint;
845 if data.len() < offset + key_len as usize {
846 return Err(ValueError::TruncatedData);
847 }
848 let key = String::from_utf8(data[offset..offset + key_len as usize].to_vec())
849 .map_err(|_| ValueError::InvalidUtf8)?;
850 offset += key_len as usize;
851 Value::KeyRef(col, key)
852 }
853 DataType::DocRef => {
854 let (col_len, col_varint) = read_varint(&data[offset..])?;
855 offset += col_varint;
856 if data.len() < offset + col_len as usize + 8 {
857 return Err(ValueError::TruncatedData);
858 }
859 let col = String::from_utf8(data[offset..offset + col_len as usize].to_vec())
860 .map_err(|_| ValueError::InvalidUtf8)?;
861 offset += col_len as usize;
862 let id = u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
863 offset += 8;
864 Value::DocRef(col, id)
865 }
866 DataType::TableRef => {
867 let (len, varint_size) = read_varint(&data[offset..])?;
868 offset += varint_size;
869 if data.len() < offset + len as usize {
870 return Err(ValueError::TruncatedData);
871 }
872 let name = String::from_utf8(data[offset..offset + len as usize].to_vec())
873 .map_err(|_| ValueError::InvalidUtf8)?;
874 offset += len as usize;
875 Value::TableRef(name)
876 }
877 DataType::PageRef => {
878 if data.len() < offset + 4 {
879 return Err(ValueError::TruncatedData);
880 }
881 let page_id = u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
882 offset += 4;
883 Value::PageRef(page_id)
884 }
885 DataType::Secret => {
886 let (len, varint_size) = read_varint(&data[offset..])?;
887 offset += varint_size;
888 if data.len() < offset + len as usize {
889 return Err(ValueError::TruncatedData);
890 }
891 let bytes = data[offset..offset + len as usize].to_vec();
892 offset += len as usize;
893 Value::Secret(bytes)
894 }
895 DataType::Password => {
896 let (len, varint_size) = read_varint(&data[offset..])?;
897 offset += varint_size;
898 if data.len() < offset + len as usize {
899 return Err(ValueError::TruncatedData);
900 }
901 let hash = String::from_utf8(data[offset..offset + len as usize].to_vec())
902 .map_err(|_| ValueError::InvalidUtf8)?;
903 offset += len as usize;
904 Value::Password(hash)
905 }
906 DataType::Nullable => {
907 Value::Null
909 }
910 DataType::Unknown => {
911 Value::Null
915 }
916 DataType::TextZstd => {
920 let (orig_len, vs1) = read_varint(&data[offset..])?;
921 offset += vs1;
922 let (comp_len, vs2) = read_varint(&data[offset..])?;
923 offset += vs2;
924 if data.len() < offset + comp_len as usize {
925 return Err(ValueError::TruncatedData);
926 }
927 let compressed = &data[offset..offset + comp_len as usize];
928 let mut decompressed = vec![0u8; orig_len as usize];
929 zstd::bulk::decompress_to_buffer(compressed, &mut decompressed)
930 .map_err(|_| ValueError::InvalidUtf8)?;
931 offset += comp_len as usize;
932 let s = String::from_utf8(decompressed).map_err(|_| ValueError::InvalidUtf8)?;
933 Value::text(s)
934 }
935 DataType::BlobZstd => {
937 let (orig_len, vs1) = read_varint(&data[offset..])?;
938 offset += vs1;
939 let (comp_len, vs2) = read_varint(&data[offset..])?;
940 offset += vs2;
941 if data.len() < offset + comp_len as usize {
942 return Err(ValueError::TruncatedData);
943 }
944 let compressed = &data[offset..offset + comp_len as usize];
945 let mut decompressed = vec![0u8; orig_len as usize];
946 zstd::bulk::decompress_to_buffer(compressed, &mut decompressed)
947 .map_err(|_| ValueError::InvalidUtf8)?;
948 offset += comp_len as usize;
949 Value::Blob(decompressed)
950 }
951 };
952
953 Ok((value, offset))
954}
955
956#[cfg(test)]
957mod tests {
958 use super::*;
959 use proptest::prelude::*;
960 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
961
962 #[test]
971 fn pinned_bytes() {
972 let mut buf = Vec::new();
974 encode(&Value::Null, &mut buf);
975 assert_eq!(buf, vec![0x00], "Value::Null layout drifted");
976
977 let mut buf = Vec::new();
979 encode(&Value::Integer(-1), &mut buf);
980 assert_eq!(
981 buf,
982 vec![0x01, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF],
983 "Value::Integer layout drifted"
984 );
985
986 let mut buf = Vec::new();
988 encode(&Value::text("hi"), &mut buf);
989 assert_eq!(
990 buf,
991 vec![0x04, 0x02, b'h', b'i'],
992 "Value::Text layout drifted"
993 );
994
995 let mut buf = Vec::new();
997 encode(&Value::Boolean(true), &mut buf);
998 assert_eq!(buf, vec![0x06, 0x01], "Value::Boolean layout drifted");
999
1000 let mut buf = Vec::new();
1002 encode(&Value::Blob(vec![0x01, 0x02, 0x03]), &mut buf);
1003 assert_eq!(
1004 buf,
1005 vec![0x05, 0x03, 0x01, 0x02, 0x03],
1006 "Value::Blob layout drifted"
1007 );
1008 }
1009
1010 #[test]
1014 fn type_tag_matches_data_type_byte() {
1015 let samples: &[Value] = &[
1016 Value::Null,
1017 Value::Integer(0),
1018 Value::UnsignedInteger(0),
1019 Value::Float(0.0),
1020 Value::text(""),
1021 Value::Blob(Vec::new()),
1022 Value::Boolean(false),
1023 Value::Timestamp(0),
1024 Value::Duration(0),
1025 Value::Uuid([0; 16]),
1026 ];
1027 for v in samples {
1028 let tag = type_tag(v);
1029 if matches!(v, Value::Null) {
1030 assert_eq!(tag, 0);
1031 } else {
1032 assert_eq!(tag, v.data_type().to_byte());
1033 let kind = type_for_tag(tag).expect("registered tag");
1034 assert_eq!(kind, v.data_type());
1035 }
1036 }
1037 }
1038
1039 #[test]
1043 fn rejects_unknown_type_tag() {
1044 let buf = [0xFFu8];
1046 let err = decode(&buf).expect_err("unknown tag must error");
1047 assert!(matches!(err, ValueError::InvalidType(0xFF)));
1048 }
1049
1050 #[test]
1054 fn rejects_truncated_buffer() {
1055 assert!(matches!(decode(&[]), Err(ValueError::EmptyData)));
1057
1058 let mut buf = vec![DataType::Integer.to_byte()];
1060 buf.extend_from_slice(&[0x01, 0x02, 0x03]);
1061 assert!(matches!(decode(&buf), Err(ValueError::TruncatedData)));
1062
1063 let mut buf = vec![DataType::Text.to_byte()];
1065 write_varint(&mut buf, 5);
1066 buf.extend_from_slice(b"ab");
1067 assert!(matches!(decode(&buf), Err(ValueError::TruncatedData)));
1068 }
1069
1070 #[test]
1071 fn type_for_tag_handles_null_registered_and_unknown_tags() {
1072 assert_eq!(type_for_tag(0), Some(DataType::Nullable));
1073 assert_eq!(
1074 type_for_tag(DataType::Money.to_byte()),
1075 Some(DataType::Money)
1076 );
1077 assert_eq!(type_for_tag(0xFF), None);
1078 }
1079
1080 #[test]
1081 fn round_trip_every_value_variant_directly_through_registry() {
1082 let values = vec![
1083 Value::Null,
1084 Value::Integer(-1),
1085 Value::UnsignedInteger(2),
1086 Value::Float(3.5),
1087 Value::text("hello"),
1088 Value::Blob(vec![1, 2, 3]),
1089 Value::Boolean(true),
1090 Value::Timestamp(4),
1091 Value::Duration(5),
1092 Value::IpAddr(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))),
1093 Value::IpAddr(IpAddr::V6(Ipv6Addr::LOCALHOST)),
1094 Value::MacAddr([1, 2, 3, 4, 5, 6]),
1095 Value::Vector(vec![1.0, 2.0]),
1096 Value::Json(br#"{"ok":true}"#.to_vec()),
1097 Value::Uuid([7; 16]),
1098 Value::NodeRef("node".to_string()),
1099 Value::EdgeRef("edge".to_string()),
1100 Value::VectorRef("vectors".to_string(), 8),
1101 Value::RowRef("rows".to_string(), 9),
1102 Value::Color([0xAA, 0xBB, 0xCC]),
1103 Value::Email("a@example.com".to_string()),
1104 Value::Url("https://example.com".to_string()),
1105 Value::Phone(5511999),
1106 Value::Semver(1_002_003),
1107 Value::Cidr(10 << 24, 8),
1108 Value::Date(20_000),
1109 Value::Time(43_200_000),
1110 Value::Decimal(123_456),
1111 Value::EnumValue(3),
1112 Value::Array(vec![Value::Integer(1), Value::text("two")]),
1113 Value::TimestampMs(123_456),
1114 Value::Ipv4(0x7f000001),
1115 Value::Ipv6([1; 16]),
1116 Value::Subnet(10 << 24, 0xff000000),
1117 Value::Port(5432),
1118 Value::Latitude(-23_550_520),
1119 Value::Longitude(-46_633_308),
1120 Value::GeoPoint(-23_550_520, -46_633_308),
1121 Value::Country2(*b"BR"),
1122 Value::Country3(*b"BRA"),
1123 Value::Lang2(*b"pt"),
1124 Value::Lang5(*b"pt-BR"),
1125 Value::Currency(*b"USD"),
1126 Value::AssetCode("BTC".to_string()),
1127 Value::Money {
1128 asset_code: "USD".to_string(),
1129 minor_units: 1234,
1130 scale: 2,
1131 },
1132 Value::ColorAlpha([1, 2, 3, 4]),
1133 Value::BigInt(-10),
1134 Value::KeyRef("kv".to_string(), "key".to_string()),
1135 Value::DocRef("docs".to_string(), 42),
1136 Value::TableRef("users".to_string()),
1137 Value::PageRef(99),
1138 Value::Secret(vec![9, 8, 7]),
1139 Value::Password("$argon2id$v=19$hash".to_string()),
1140 ];
1141
1142 for original in values {
1143 let mut bytes = Vec::new();
1144 encode(&original, &mut bytes);
1145 let (decoded, consumed) = decode(&bytes).expect("decode");
1146 assert_eq!(consumed, bytes.len());
1147 assert_eq!(decoded, original, "{bytes:?}");
1148 }
1149 }
1150
1151 #[test]
1152 fn compressed_text_and_blob_decode_to_plain_values() {
1153 let text = Value::text("reddb ".repeat(700));
1154 let mut bytes = Vec::new();
1155 encode(&text, &mut bytes);
1156 assert_eq!(bytes[0], DataType::TextZstd.to_byte());
1157 let (decoded, consumed) = decode(&bytes).unwrap();
1158 assert_eq!(consumed, bytes.len());
1159 assert_eq!(decoded, text);
1160
1161 let blob = Value::Blob(vec![0xAB; TOAST_THRESHOLD + 512]);
1162 let mut bytes = Vec::new();
1163 encode(&blob, &mut bytes);
1164 assert_eq!(bytes[0], DataType::BlobZstd.to_byte());
1165 let (decoded, consumed) = decode(&bytes).unwrap();
1166 assert_eq!(consumed, bytes.len());
1167 assert_eq!(decoded, blob);
1168 }
1169
1170 #[test]
1171 fn decode_rejects_short_payload_for_registered_tags() {
1172 let truncated_tags = [
1173 DataType::Integer,
1174 DataType::UnsignedInteger,
1175 DataType::Float,
1176 DataType::Text,
1177 DataType::Blob,
1178 DataType::Boolean,
1179 DataType::Timestamp,
1180 DataType::Duration,
1181 DataType::IpAddr,
1182 DataType::MacAddr,
1183 DataType::Vector,
1184 DataType::Json,
1185 DataType::Uuid,
1186 DataType::NodeRef,
1187 DataType::EdgeRef,
1188 DataType::VectorRef,
1189 DataType::RowRef,
1190 DataType::Color,
1191 DataType::Email,
1192 DataType::Url,
1193 DataType::Phone,
1194 DataType::Semver,
1195 DataType::Cidr,
1196 DataType::Date,
1197 DataType::Time,
1198 DataType::Decimal,
1199 DataType::Enum,
1200 DataType::Array,
1201 DataType::TimestampMs,
1202 DataType::Ipv4,
1203 DataType::Ipv6,
1204 DataType::Subnet,
1205 DataType::Port,
1206 DataType::Latitude,
1207 DataType::Longitude,
1208 DataType::GeoPoint,
1209 DataType::Country2,
1210 DataType::Country3,
1211 DataType::Lang2,
1212 DataType::Lang5,
1213 DataType::Currency,
1214 DataType::AssetCode,
1215 DataType::Money,
1216 DataType::ColorAlpha,
1217 DataType::BigInt,
1218 DataType::KeyRef,
1219 DataType::DocRef,
1220 DataType::TableRef,
1221 DataType::PageRef,
1222 DataType::Secret,
1223 DataType::Password,
1224 DataType::TextZstd,
1225 DataType::BlobZstd,
1226 ];
1227
1228 for data_type in truncated_tags {
1229 let err = decode(&[data_type.to_byte()]).expect_err("short payload must error");
1230 assert_eq!(err, ValueError::TruncatedData, "{data_type:?}");
1231 }
1232
1233 assert_eq!(
1234 decode(&[DataType::Nullable.to_byte()]).unwrap().0,
1235 Value::Null
1236 );
1237 }
1238
1239 #[test]
1240 fn decode_rejects_invalid_embedded_tags_and_utf8_payloads() {
1241 assert_eq!(
1242 decode(&[DataType::IpAddr.to_byte(), 5]).expect_err("bad ip version"),
1243 ValueError::InvalidIpVersion(5)
1244 );
1245
1246 let invalid_text = [DataType::Text.to_byte(), 1, 0xff];
1247 assert_eq!(
1248 decode(&invalid_text).expect_err("invalid utf8"),
1249 ValueError::InvalidUtf8
1250 );
1251
1252 let invalid_email = [DataType::Email.to_byte(), 1, 0xff];
1253 assert_eq!(
1254 decode(&invalid_email).expect_err("invalid utf8"),
1255 ValueError::InvalidUtf8
1256 );
1257
1258 let invalid_url = [DataType::Url.to_byte(), 1, 0xff];
1259 assert_eq!(
1260 decode(&invalid_url).expect_err("invalid utf8"),
1261 ValueError::InvalidUtf8
1262 );
1263
1264 let invalid_asset = [DataType::AssetCode.to_byte(), 1, 0xff];
1265 assert_eq!(
1266 decode(&invalid_asset).expect_err("invalid utf8"),
1267 ValueError::InvalidUtf8
1268 );
1269 }
1270
1271 #[test]
1274 fn round_trip_canonical_variants() {
1275 let cases = vec![
1276 Value::Null,
1277 Value::Integer(-12345),
1278 Value::text("hello"),
1279 Value::Boolean(true),
1280 Value::Blob(vec![1, 2, 3, 4, 5]),
1281 ];
1282 for original in cases {
1283 let mut bytes = Vec::new();
1284 encode(&original, &mut bytes);
1285 let (recovered, consumed) = decode(&bytes).expect("decode");
1286 assert_eq!(consumed, bytes.len());
1287 assert_eq!(original, recovered);
1288 }
1289 }
1290
1291 fn value_variant_strategy() -> impl Strategy<Value = Value> {
1292 prop_oneof![
1293 Just(Value::Null),
1294 any::<bool>().prop_map(Value::Boolean),
1295 any::<i64>().prop_map(Value::Integer),
1296 prop_oneof![
1297 any::<f64>(),
1298 Just(f64::NAN),
1299 Just(f64::INFINITY),
1300 Just(f64::NEG_INFINITY),
1301 Just(f64::MIN_POSITIVE),
1302 Just(f64::from_bits(1)),
1303 ]
1304 .prop_map(Value::Float),
1305 proptest::collection::vec(any::<u8>(), 0..4096).prop_map(Value::Blob),
1306 prop_oneof![
1307 Just(br#"{"a":null,"b":[1,true]}"#.to_vec()),
1308 Just(br#"[]"#.to_vec()),
1309 Just(br#"{"deep":{"nest":{"leaf":[false]}}}"#.to_vec()),
1310 ]
1311 .prop_map(Value::Json),
1312 any::<i64>().prop_map(Value::Timestamp),
1313 any::<[u8; 16]>().prop_map(Value::Uuid),
1314 ]
1315 }
1316
1317 proptest! {
1318 #![proptest_config(ProptestConfig::with_cases(256))]
1319
1320 #[test]
1321 fn prop_value_codec_round_trips_remaining_variants(original in value_variant_strategy()) {
1322 let mut bytes = Vec::new();
1323 encode(&original, &mut bytes);
1324 let (recovered, consumed) = decode(&bytes).expect("decode");
1325 prop_assert_eq!(consumed, bytes.len());
1326 prop_assert!(
1327 values_equivalent(&recovered, &original),
1328 "recovered={recovered:?} original={original:?}"
1329 );
1330 }
1331 }
1332
1333 fn values_equivalent(left: &Value, right: &Value) -> bool {
1334 match (left, right) {
1335 (Value::Float(a), Value::Float(b)) => a.to_bits() == b.to_bits(),
1336 _ => left == right,
1337 }
1338 }
1339}