Skip to main content

reddb_types/
value_codec.rs

1//! On-disk codec registry for [`Value`].
2//!
3//! This module is the **single source of truth** for the byte layout
4//! of every [`Value`] variant. Adding a new variant means:
5//!
6//! 1. Add the variant to [`Value`].
7//! 2. Add the matching [`DataType`] tag (the on-disk type byte).
8//! 3. Add an arm to [`encode`] and [`decode`] in this file.
9//!
10//! That's it — no other file needs to learn the layout. The inherent
11//! [`Value::to_bytes`] / [`Value::from_bytes`] methods stay as the
12//! public API, but they only delegate here.
13//!
14//! ## Why a registry
15//!
16//! Before this module the encode / decode arms lived inside
17//! `types.rs`, mixed with display / coercion / hashing logic. A
18//! parallel `value_type_tag` helper in `storage::query` carried a
19//! third numbering scheme. The result was that every new variant
20//! required edits in three or more places and the tag spaces were
21//! free to drift.
22//!
23//! With the registry there is exactly one mapping
24//! `Value <-> on-disk bytes`. The wire protocol keeps its own,
25//! independent `VAL_*` tag space (see `wire/protocol.rs`); the two
26//! were never identical and any future unification is out of scope.
27//!
28//! ## On-disk format
29//!
30//! Bytes are unchanged versus the previous in-place implementation.
31//! The pinned-byte regression test [`tests::pinned_bytes`] guards
32//! the layout for the canonical variants (Null, Integer, Text, Bool,
33//! Blob).
34
35use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
36
37use super::types::{read_varint, write_varint, DataType, Value, ValueError};
38
39/// Alias kept for callers that prefer the registry's own name. The
40/// on-disk tag space is owned by [`DataType`]; `ValueKind` reads
41/// better in registry contexts where the type name is the schema
42/// label rather than a parser concept.
43pub type ValueKind = DataType;
44
45/// On-disk tag byte for a value.
46///
47/// `Value::Null` uses tag `0` (the same byte the legacy code reserved
48/// as the explicit null marker before the [`DataType`] enum existed).
49/// Every other variant returns `data_type().to_byte()`.
50#[inline]
51pub fn type_tag(value: &Value) -> u8 {
52    match value {
53        Value::Null => 0,
54        other => other.data_type().to_byte(),
55    }
56}
57
58/// Reverse lookup for [`type_tag`]. Returns `None` for unknown bytes;
59/// `Some(DataType::Nullable)` for the dedicated null marker `0`.
60#[inline]
61pub fn type_for_tag(tag: u8) -> Option<ValueKind> {
62    if tag == 0 {
63        Some(DataType::Nullable)
64    } else {
65        DataType::from_byte(tag)
66    }
67}
68
69/// C3 TOAST: minimum byte length to attempt zstd compression. Values
70/// shorter than this are stored uncompressed — compression overhead
71/// (~50 ns + header bytes) outweighs savings for small values.
72pub(super) const TOAST_THRESHOLD: usize = 2048;
73
74/// zstd compression level for TOAST values. Level 3 is PG's default
75/// (balanced speed/ratio).
76pub(super) const TOAST_ZSTD_LEVEL: i32 = 3;
77
78/// Encode a value into `out`, appending its on-disk byte sequence.
79///
80/// The first byte is always [`type_tag`] of `value`; the remainder
81/// is the variant-specific payload.
82pub fn encode(value: &Value, out: &mut Vec<u8>) {
83    match value {
84        Value::Null => {
85            out.push(0); // Null marker
86        }
87        Value::Integer(v) => {
88            out.push(DataType::Integer.to_byte());
89            out.extend_from_slice(&v.to_le_bytes());
90        }
91        Value::UnsignedInteger(v) => {
92            out.push(DataType::UnsignedInteger.to_byte());
93            out.extend_from_slice(&v.to_le_bytes());
94        }
95        Value::Float(v) => {
96            out.push(DataType::Float.to_byte());
97            out.extend_from_slice(&v.to_le_bytes());
98        }
99        Value::Text(s) => {
100            let bytes = s.as_bytes();
101            // C3 TOAST: compress text values larger than the threshold.
102            // Stores with `TextZstd` type byte when compression wins;
103            // falls back to plain `Text` for small values or when zstd
104            // doesn't reduce the size (e.g. already-compressed content).
105            if bytes.len() > TOAST_THRESHOLD {
106                if let Ok(compressed) = zstd::bulk::compress(bytes, TOAST_ZSTD_LEVEL) {
107                    if compressed.len() < bytes.len() {
108                        out.push(DataType::TextZstd.to_byte());
109                        // original length first (needed to pre-allocate decompression buffer)
110                        write_varint(out, bytes.len() as u64);
111                        write_varint(out, compressed.len() as u64);
112                        out.extend_from_slice(&compressed);
113                        return;
114                    }
115                }
116            }
117            out.push(DataType::Text.to_byte());
118            write_varint(out, bytes.len() as u64);
119            out.extend_from_slice(bytes);
120        }
121        Value::Blob(data) => {
122            // C3 TOAST: same pattern as Text.
123            if data.len() > TOAST_THRESHOLD {
124                if let Ok(compressed) = zstd::bulk::compress(data, TOAST_ZSTD_LEVEL) {
125                    if compressed.len() < data.len() {
126                        out.push(DataType::BlobZstd.to_byte());
127                        write_varint(out, data.len() as u64);
128                        write_varint(out, compressed.len() as u64);
129                        out.extend_from_slice(&compressed);
130                        return;
131                    }
132                }
133            }
134            out.push(DataType::Blob.to_byte());
135            write_varint(out, data.len() as u64);
136            out.extend_from_slice(data);
137        }
138        Value::Boolean(v) => {
139            out.push(DataType::Boolean.to_byte());
140            out.push(if *v { 1 } else { 0 });
141        }
142        Value::Timestamp(v) => {
143            out.push(DataType::Timestamp.to_byte());
144            out.extend_from_slice(&v.to_le_bytes());
145        }
146        Value::Duration(v) => {
147            out.push(DataType::Duration.to_byte());
148            out.extend_from_slice(&v.to_le_bytes());
149        }
150        Value::IpAddr(addr) => {
151            out.push(DataType::IpAddr.to_byte());
152            match addr {
153                IpAddr::V4(v4) => {
154                    out.push(4); // IPv4 marker
155                    out.extend_from_slice(&v4.octets());
156                }
157                IpAddr::V6(v6) => {
158                    out.push(6); // IPv6 marker
159                    out.extend_from_slice(&v6.octets());
160                }
161            }
162        }
163        Value::MacAddr(mac) => {
164            out.push(DataType::MacAddr.to_byte());
165            out.extend_from_slice(mac);
166        }
167        Value::Vector(vec) => {
168            out.push(DataType::Vector.to_byte());
169            write_varint(out, vec.len() as u64);
170            for v in vec {
171                out.extend_from_slice(&v.to_le_bytes());
172            }
173        }
174        Value::Json(data) => {
175            out.push(DataType::Json.to_byte());
176            write_varint(out, data.len() as u64);
177            out.extend_from_slice(data);
178        }
179        Value::Uuid(uuid) => {
180            out.push(DataType::Uuid.to_byte());
181            out.extend_from_slice(uuid);
182        }
183        Value::NodeRef(node_id) => {
184            out.push(DataType::NodeRef.to_byte());
185            let bytes = node_id.as_bytes();
186            write_varint(out, bytes.len() as u64);
187            out.extend_from_slice(bytes);
188        }
189        Value::EdgeRef(edge_id) => {
190            out.push(DataType::EdgeRef.to_byte());
191            let bytes = edge_id.as_bytes();
192            write_varint(out, bytes.len() as u64);
193            out.extend_from_slice(bytes);
194        }
195        Value::VectorRef(collection, vector_id) => {
196            out.push(DataType::VectorRef.to_byte());
197            let coll_bytes = collection.as_bytes();
198            write_varint(out, coll_bytes.len() as u64);
199            out.extend_from_slice(coll_bytes);
200            out.extend_from_slice(&vector_id.to_le_bytes());
201        }
202        Value::RowRef(table, row_id) => {
203            out.push(DataType::RowRef.to_byte());
204            let table_bytes = table.as_bytes();
205            write_varint(out, table_bytes.len() as u64);
206            out.extend_from_slice(table_bytes);
207            out.extend_from_slice(&row_id.to_le_bytes());
208        }
209        Value::Color(rgb) => {
210            out.push(DataType::Color.to_byte());
211            out.extend_from_slice(rgb);
212        }
213        Value::Email(s) => {
214            out.push(DataType::Email.to_byte());
215            let bytes = s.as_bytes();
216            write_varint(out, bytes.len() as u64);
217            out.extend_from_slice(bytes);
218        }
219        Value::Url(s) => {
220            out.push(DataType::Url.to_byte());
221            let bytes = s.as_bytes();
222            write_varint(out, bytes.len() as u64);
223            out.extend_from_slice(bytes);
224        }
225        Value::Phone(n) => {
226            out.push(DataType::Phone.to_byte());
227            out.extend_from_slice(&n.to_le_bytes());
228        }
229        Value::Semver(packed) => {
230            out.push(DataType::Semver.to_byte());
231            out.extend_from_slice(&packed.to_le_bytes());
232        }
233        Value::Cidr(ip, prefix) => {
234            out.push(DataType::Cidr.to_byte());
235            out.extend_from_slice(&ip.to_le_bytes());
236            out.push(*prefix);
237        }
238        Value::Date(days) => {
239            out.push(DataType::Date.to_byte());
240            out.extend_from_slice(&days.to_le_bytes());
241        }
242        Value::Time(ms) => {
243            out.push(DataType::Time.to_byte());
244            out.extend_from_slice(&ms.to_le_bytes());
245        }
246        Value::Decimal(v) => {
247            out.push(DataType::Decimal.to_byte());
248            out.extend_from_slice(&v.to_le_bytes());
249        }
250        Value::EnumValue(idx) => {
251            out.push(DataType::Enum.to_byte());
252            out.push(*idx);
253        }
254        Value::Array(elements) => {
255            out.push(DataType::Array.to_byte());
256            write_varint(out, elements.len() as u64);
257            for elem in elements {
258                encode(elem, out);
259            }
260        }
261        Value::TimestampMs(v) => {
262            out.push(DataType::TimestampMs.to_byte());
263            out.extend_from_slice(&v.to_le_bytes());
264        }
265        Value::Ipv4(v) => {
266            out.push(DataType::Ipv4.to_byte());
267            out.extend_from_slice(&v.to_le_bytes());
268        }
269        Value::Ipv6(bytes) => {
270            out.push(DataType::Ipv6.to_byte());
271            out.extend_from_slice(bytes);
272        }
273        Value::Subnet(ip, mask) => {
274            out.push(DataType::Subnet.to_byte());
275            out.extend_from_slice(&ip.to_le_bytes());
276            out.extend_from_slice(&mask.to_le_bytes());
277        }
278        Value::Port(v) => {
279            out.push(DataType::Port.to_byte());
280            out.extend_from_slice(&v.to_le_bytes());
281        }
282        Value::Latitude(v) => {
283            out.push(DataType::Latitude.to_byte());
284            out.extend_from_slice(&v.to_le_bytes());
285        }
286        Value::Longitude(v) => {
287            out.push(DataType::Longitude.to_byte());
288            out.extend_from_slice(&v.to_le_bytes());
289        }
290        Value::GeoPoint(lat, lon) => {
291            out.push(DataType::GeoPoint.to_byte());
292            out.extend_from_slice(&lat.to_le_bytes());
293            out.extend_from_slice(&lon.to_le_bytes());
294        }
295        Value::Country2(c) => {
296            out.push(DataType::Country2.to_byte());
297            out.extend_from_slice(c);
298        }
299        Value::Country3(c) => {
300            out.push(DataType::Country3.to_byte());
301            out.extend_from_slice(c);
302        }
303        Value::Lang2(c) => {
304            out.push(DataType::Lang2.to_byte());
305            out.extend_from_slice(c);
306        }
307        Value::Lang5(c) => {
308            out.push(DataType::Lang5.to_byte());
309            out.extend_from_slice(c);
310        }
311        Value::Currency(c) => {
312            out.push(DataType::Currency.to_byte());
313            out.extend_from_slice(c);
314        }
315        Value::AssetCode(code) => {
316            out.push(DataType::AssetCode.to_byte());
317            let bytes = code.as_bytes();
318            write_varint(out, bytes.len() as u64);
319            out.extend_from_slice(bytes);
320        }
321        Value::Money {
322            asset_code,
323            minor_units,
324            scale,
325        } => {
326            out.push(DataType::Money.to_byte());
327            let bytes = asset_code.as_bytes();
328            write_varint(out, bytes.len() as u64);
329            out.extend_from_slice(bytes);
330            out.push(*scale);
331            out.extend_from_slice(&minor_units.to_le_bytes());
332        }
333        Value::ColorAlpha(rgba) => {
334            out.push(DataType::ColorAlpha.to_byte());
335            out.extend_from_slice(rgba);
336        }
337        Value::BigInt(v) => {
338            out.push(DataType::BigInt.to_byte());
339            out.extend_from_slice(&v.to_le_bytes());
340        }
341        Value::KeyRef(col, key) => {
342            out.push(DataType::KeyRef.to_byte());
343            let col_bytes = col.as_bytes();
344            write_varint(out, col_bytes.len() as u64);
345            out.extend_from_slice(col_bytes);
346            let key_bytes = key.as_bytes();
347            write_varint(out, key_bytes.len() as u64);
348            out.extend_from_slice(key_bytes);
349        }
350        Value::DocRef(col, id) => {
351            out.push(DataType::DocRef.to_byte());
352            let col_bytes = col.as_bytes();
353            write_varint(out, col_bytes.len() as u64);
354            out.extend_from_slice(col_bytes);
355            out.extend_from_slice(&id.to_le_bytes());
356        }
357        Value::TableRef(name) => {
358            out.push(DataType::TableRef.to_byte());
359            let name_bytes = name.as_bytes();
360            write_varint(out, name_bytes.len() as u64);
361            out.extend_from_slice(name_bytes);
362        }
363        Value::PageRef(page_id) => {
364            out.push(DataType::PageRef.to_byte());
365            out.extend_from_slice(&page_id.to_le_bytes());
366        }
367        Value::Secret(bytes) => {
368            out.push(DataType::Secret.to_byte());
369            write_varint(out, bytes.len() as u64);
370            out.extend_from_slice(bytes);
371        }
372        Value::Password(hash) => {
373            out.push(DataType::Password.to_byte());
374            let bytes = hash.as_bytes();
375            write_varint(out, bytes.len() as u64);
376            out.extend_from_slice(bytes);
377        }
378    }
379}
380
381/// Decode a single value from `data`, returning the value and the
382/// number of bytes consumed.
383pub fn decode(data: &[u8]) -> Result<(Value, usize), ValueError> {
384    if data.is_empty() {
385        return Err(ValueError::EmptyData);
386    }
387
388    let type_byte = data[0];
389    let mut offset = 1;
390
391    // Null marker
392    if type_byte == 0 {
393        return Ok((Value::Null, 1));
394    }
395
396    let data_type = DataType::from_byte(type_byte).ok_or(ValueError::InvalidType(type_byte))?;
397
398    let value = match data_type {
399        DataType::Integer => {
400            if data.len() < offset + 8 {
401                return Err(ValueError::TruncatedData);
402            }
403            let v = i64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
404            offset += 8;
405            Value::Integer(v)
406        }
407        DataType::UnsignedInteger => {
408            if data.len() < offset + 8 {
409                return Err(ValueError::TruncatedData);
410            }
411            let v = u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
412            offset += 8;
413            Value::UnsignedInteger(v)
414        }
415        DataType::Float => {
416            if data.len() < offset + 8 {
417                return Err(ValueError::TruncatedData);
418            }
419            let v = f64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
420            offset += 8;
421            Value::Float(v)
422        }
423        DataType::Text => {
424            let (len, varint_size) = read_varint(&data[offset..])?;
425            offset += varint_size;
426            if data.len() < offset + len as usize {
427                return Err(ValueError::TruncatedData);
428            }
429            let s = String::from_utf8(data[offset..offset + len as usize].to_vec())
430                .map_err(|_| ValueError::InvalidUtf8)?;
431            offset += len as usize;
432            Value::text(s)
433        }
434        DataType::Blob => {
435            let (len, varint_size) = read_varint(&data[offset..])?;
436            offset += varint_size;
437            if data.len() < offset + len as usize {
438                return Err(ValueError::TruncatedData);
439            }
440            let blob = data[offset..offset + len as usize].to_vec();
441            offset += len as usize;
442            Value::Blob(blob)
443        }
444        DataType::Boolean => {
445            if data.len() < offset + 1 {
446                return Err(ValueError::TruncatedData);
447            }
448            let v = data[offset] != 0;
449            offset += 1;
450            Value::Boolean(v)
451        }
452        DataType::Timestamp => {
453            if data.len() < offset + 8 {
454                return Err(ValueError::TruncatedData);
455            }
456            let v = i64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
457            offset += 8;
458            Value::Timestamp(v)
459        }
460        DataType::Duration => {
461            if data.len() < offset + 8 {
462                return Err(ValueError::TruncatedData);
463            }
464            let v = i64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
465            offset += 8;
466            Value::Duration(v)
467        }
468        DataType::IpAddr => {
469            if data.len() < offset + 1 {
470                return Err(ValueError::TruncatedData);
471            }
472            let version = data[offset];
473            offset += 1;
474            match version {
475                4 => {
476                    if data.len() < offset + 4 {
477                        return Err(ValueError::TruncatedData);
478                    }
479                    let octets: [u8; 4] = data[offset..offset + 4].try_into().unwrap();
480                    offset += 4;
481                    Value::IpAddr(IpAddr::V4(Ipv4Addr::from(octets)))
482                }
483                6 => {
484                    if data.len() < offset + 16 {
485                        return Err(ValueError::TruncatedData);
486                    }
487                    let octets: [u8; 16] = data[offset..offset + 16].try_into().unwrap();
488                    offset += 16;
489                    Value::IpAddr(IpAddr::V6(Ipv6Addr::from(octets)))
490                }
491                _ => return Err(ValueError::InvalidIpVersion(version)),
492            }
493        }
494        DataType::MacAddr => {
495            if data.len() < offset + 6 {
496                return Err(ValueError::TruncatedData);
497            }
498            let mac: [u8; 6] = data[offset..offset + 6].try_into().unwrap();
499            offset += 6;
500            Value::MacAddr(mac)
501        }
502        DataType::Vector => {
503            let (len, varint_size) = read_varint(&data[offset..])?;
504            offset += varint_size;
505            let float_count = len as usize;
506            if data.len() < offset + float_count * 4 {
507                return Err(ValueError::TruncatedData);
508            }
509            let mut vec = Vec::with_capacity(float_count);
510            for _ in 0..float_count {
511                let v = f32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
512                offset += 4;
513                vec.push(v);
514            }
515            Value::Vector(vec)
516        }
517        DataType::Json => {
518            let (len, varint_size) = read_varint(&data[offset..])?;
519            offset += varint_size;
520            if data.len() < offset + len as usize {
521                return Err(ValueError::TruncatedData);
522            }
523            let json = data[offset..offset + len as usize].to_vec();
524            offset += len as usize;
525            Value::Json(json)
526        }
527        DataType::Uuid => {
528            if data.len() < offset + 16 {
529                return Err(ValueError::TruncatedData);
530            }
531            let uuid: [u8; 16] = data[offset..offset + 16].try_into().unwrap();
532            offset += 16;
533            Value::Uuid(uuid)
534        }
535        DataType::NodeRef => {
536            let (len, len_bytes) = read_varint(&data[offset..])?;
537            offset += len_bytes;
538            if data.len() < offset + len as usize {
539                return Err(ValueError::TruncatedData);
540            }
541            let node_id = String::from_utf8_lossy(&data[offset..offset + len as usize]).to_string();
542            offset += len as usize;
543            Value::NodeRef(node_id)
544        }
545        DataType::EdgeRef => {
546            let (len, len_bytes) = read_varint(&data[offset..])?;
547            offset += len_bytes;
548            if data.len() < offset + len as usize {
549                return Err(ValueError::TruncatedData);
550            }
551            let edge_id = String::from_utf8_lossy(&data[offset..offset + len as usize]).to_string();
552            offset += len as usize;
553            Value::EdgeRef(edge_id)
554        }
555        DataType::VectorRef => {
556            let (len, len_bytes) = read_varint(&data[offset..])?;
557            offset += len_bytes;
558            if data.len() < offset + len as usize + 8 {
559                return Err(ValueError::TruncatedData);
560            }
561            let collection =
562                String::from_utf8_lossy(&data[offset..offset + len as usize]).to_string();
563            offset += len as usize;
564            let vector_id = u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
565            offset += 8;
566            Value::VectorRef(collection, vector_id)
567        }
568        DataType::RowRef => {
569            let (len, len_bytes) = read_varint(&data[offset..])?;
570            offset += len_bytes;
571            if data.len() < offset + len as usize + 8 {
572                return Err(ValueError::TruncatedData);
573            }
574            let table = String::from_utf8_lossy(&data[offset..offset + len as usize]).to_string();
575            offset += len as usize;
576            let row_id = u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
577            offset += 8;
578            Value::RowRef(table, row_id)
579        }
580        DataType::Color => {
581            if data.len() < offset + 3 {
582                return Err(ValueError::TruncatedData);
583            }
584            let rgb: [u8; 3] = data[offset..offset + 3].try_into().unwrap();
585            offset += 3;
586            Value::Color(rgb)
587        }
588        DataType::Email => {
589            let (len, varint_size) = read_varint(&data[offset..])?;
590            offset += varint_size;
591            if data.len() < offset + len as usize {
592                return Err(ValueError::TruncatedData);
593            }
594            let s = String::from_utf8(data[offset..offset + len as usize].to_vec())
595                .map_err(|_| ValueError::InvalidUtf8)?;
596            offset += len as usize;
597            Value::Email(s)
598        }
599        DataType::Url => {
600            let (len, varint_size) = read_varint(&data[offset..])?;
601            offset += varint_size;
602            if data.len() < offset + len as usize {
603                return Err(ValueError::TruncatedData);
604            }
605            let s = String::from_utf8(data[offset..offset + len as usize].to_vec())
606                .map_err(|_| ValueError::InvalidUtf8)?;
607            offset += len as usize;
608            Value::Url(s)
609        }
610        DataType::Phone => {
611            if data.len() < offset + 8 {
612                return Err(ValueError::TruncatedData);
613            }
614            let v = u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
615            offset += 8;
616            Value::Phone(v)
617        }
618        DataType::Semver => {
619            if data.len() < offset + 4 {
620                return Err(ValueError::TruncatedData);
621            }
622            let v = u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
623            offset += 4;
624            Value::Semver(v)
625        }
626        DataType::Cidr => {
627            if data.len() < offset + 5 {
628                return Err(ValueError::TruncatedData);
629            }
630            let ip = u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
631            offset += 4;
632            let prefix = data[offset];
633            offset += 1;
634            Value::Cidr(ip, prefix)
635        }
636        DataType::Date => {
637            if data.len() < offset + 4 {
638                return Err(ValueError::TruncatedData);
639            }
640            let v = i32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
641            offset += 4;
642            Value::Date(v)
643        }
644        DataType::Time => {
645            if data.len() < offset + 4 {
646                return Err(ValueError::TruncatedData);
647            }
648            let v = u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
649            offset += 4;
650            Value::Time(v)
651        }
652        DataType::Decimal => {
653            if data.len() < offset + 8 {
654                return Err(ValueError::TruncatedData);
655            }
656            let v = i64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
657            offset += 8;
658            Value::Decimal(v)
659        }
660        DataType::Enum => {
661            if data.len() < offset + 1 {
662                return Err(ValueError::TruncatedData);
663            }
664            let idx = data[offset];
665            offset += 1;
666            Value::EnumValue(idx)
667        }
668        DataType::Array => {
669            let (len, varint_size) = read_varint(&data[offset..])?;
670            offset += varint_size;
671            let count = len as usize;
672            let mut elements = Vec::with_capacity(count);
673            for _ in 0..count {
674                let (elem, elem_size) = decode(&data[offset..])?;
675                offset += elem_size;
676                elements.push(elem);
677            }
678            Value::Array(elements)
679        }
680        DataType::TimestampMs => {
681            if data.len() < offset + 8 {
682                return Err(ValueError::TruncatedData);
683            }
684            let v = i64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
685            offset += 8;
686            Value::TimestampMs(v)
687        }
688        DataType::Ipv4 => {
689            if data.len() < offset + 4 {
690                return Err(ValueError::TruncatedData);
691            }
692            let v = u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
693            offset += 4;
694            Value::Ipv4(v)
695        }
696        DataType::Ipv6 => {
697            if data.len() < offset + 16 {
698                return Err(ValueError::TruncatedData);
699            }
700            let bytes: [u8; 16] = data[offset..offset + 16].try_into().unwrap();
701            offset += 16;
702            Value::Ipv6(bytes)
703        }
704        DataType::Subnet => {
705            if data.len() < offset + 8 {
706                return Err(ValueError::TruncatedData);
707            }
708            let ip = u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
709            offset += 4;
710            let mask = u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
711            offset += 4;
712            Value::Subnet(ip, mask)
713        }
714        DataType::Port => {
715            if data.len() < offset + 2 {
716                return Err(ValueError::TruncatedData);
717            }
718            let v = u16::from_le_bytes(data[offset..offset + 2].try_into().unwrap());
719            offset += 2;
720            Value::Port(v)
721        }
722        DataType::Latitude => {
723            if data.len() < offset + 4 {
724                return Err(ValueError::TruncatedData);
725            }
726            let v = i32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
727            offset += 4;
728            Value::Latitude(v)
729        }
730        DataType::Longitude => {
731            if data.len() < offset + 4 {
732                return Err(ValueError::TruncatedData);
733            }
734            let v = i32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
735            offset += 4;
736            Value::Longitude(v)
737        }
738        DataType::GeoPoint => {
739            if data.len() < offset + 8 {
740                return Err(ValueError::TruncatedData);
741            }
742            let lat = i32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
743            offset += 4;
744            let lon = i32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
745            offset += 4;
746            Value::GeoPoint(lat, lon)
747        }
748        DataType::Country2 => {
749            if data.len() < offset + 2 {
750                return Err(ValueError::TruncatedData);
751            }
752            let c: [u8; 2] = data[offset..offset + 2].try_into().unwrap();
753            offset += 2;
754            Value::Country2(c)
755        }
756        DataType::Country3 => {
757            if data.len() < offset + 3 {
758                return Err(ValueError::TruncatedData);
759            }
760            let c: [u8; 3] = data[offset..offset + 3].try_into().unwrap();
761            offset += 3;
762            Value::Country3(c)
763        }
764        DataType::Lang2 => {
765            if data.len() < offset + 2 {
766                return Err(ValueError::TruncatedData);
767            }
768            let c: [u8; 2] = data[offset..offset + 2].try_into().unwrap();
769            offset += 2;
770            Value::Lang2(c)
771        }
772        DataType::Lang5 => {
773            if data.len() < offset + 5 {
774                return Err(ValueError::TruncatedData);
775            }
776            let c: [u8; 5] = data[offset..offset + 5].try_into().unwrap();
777            offset += 5;
778            Value::Lang5(c)
779        }
780        DataType::Currency => {
781            if data.len() < offset + 3 {
782                return Err(ValueError::TruncatedData);
783            }
784            let c: [u8; 3] = data[offset..offset + 3].try_into().unwrap();
785            offset += 3;
786            Value::Currency(c)
787        }
788        DataType::AssetCode => {
789            let (len, len_bytes) = read_varint(&data[offset..])?;
790            offset += len_bytes;
791            if data.len() < offset + len as usize {
792                return Err(ValueError::TruncatedData);
793            }
794            let code = String::from_utf8(data[offset..offset + len as usize].to_vec())
795                .map_err(|_| ValueError::InvalidUtf8)?;
796            offset += len as usize;
797            Value::AssetCode(code)
798        }
799        DataType::Money => {
800            let (len, len_bytes) = read_varint(&data[offset..])?;
801            offset += len_bytes;
802            if data.len() < offset + len as usize + 1 + 8 {
803                return Err(ValueError::TruncatedData);
804            }
805            let asset_code = String::from_utf8(data[offset..offset + len as usize].to_vec())
806                .map_err(|_| ValueError::InvalidUtf8)?;
807            offset += len as usize;
808            let scale = data[offset];
809            offset += 1;
810            let minor_units = i64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
811            offset += 8;
812            Value::Money {
813                asset_code,
814                minor_units,
815                scale,
816            }
817        }
818        DataType::ColorAlpha => {
819            if data.len() < offset + 4 {
820                return Err(ValueError::TruncatedData);
821            }
822            let rgba: [u8; 4] = data[offset..offset + 4].try_into().unwrap();
823            offset += 4;
824            Value::ColorAlpha(rgba)
825        }
826        DataType::BigInt => {
827            if data.len() < offset + 8 {
828                return Err(ValueError::TruncatedData);
829            }
830            let v = i64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
831            offset += 8;
832            Value::BigInt(v)
833        }
834        DataType::KeyRef => {
835            let (col_len, col_varint) = read_varint(&data[offset..])?;
836            offset += col_varint;
837            if data.len() < offset + col_len as usize {
838                return Err(ValueError::TruncatedData);
839            }
840            let col = String::from_utf8(data[offset..offset + col_len as usize].to_vec())
841                .map_err(|_| ValueError::InvalidUtf8)?;
842            offset += col_len as usize;
843            let (key_len, key_varint) = read_varint(&data[offset..])?;
844            offset += key_varint;
845            if data.len() < offset + key_len as usize {
846                return Err(ValueError::TruncatedData);
847            }
848            let key = String::from_utf8(data[offset..offset + key_len as usize].to_vec())
849                .map_err(|_| ValueError::InvalidUtf8)?;
850            offset += key_len as usize;
851            Value::KeyRef(col, key)
852        }
853        DataType::DocRef => {
854            let (col_len, col_varint) = read_varint(&data[offset..])?;
855            offset += col_varint;
856            if data.len() < offset + col_len as usize + 8 {
857                return Err(ValueError::TruncatedData);
858            }
859            let col = String::from_utf8(data[offset..offset + col_len as usize].to_vec())
860                .map_err(|_| ValueError::InvalidUtf8)?;
861            offset += col_len as usize;
862            let id = u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
863            offset += 8;
864            Value::DocRef(col, id)
865        }
866        DataType::TableRef => {
867            let (len, varint_size) = read_varint(&data[offset..])?;
868            offset += varint_size;
869            if data.len() < offset + len as usize {
870                return Err(ValueError::TruncatedData);
871            }
872            let name = String::from_utf8(data[offset..offset + len as usize].to_vec())
873                .map_err(|_| ValueError::InvalidUtf8)?;
874            offset += len as usize;
875            Value::TableRef(name)
876        }
877        DataType::PageRef => {
878            if data.len() < offset + 4 {
879                return Err(ValueError::TruncatedData);
880            }
881            let page_id = u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
882            offset += 4;
883            Value::PageRef(page_id)
884        }
885        DataType::Secret => {
886            let (len, varint_size) = read_varint(&data[offset..])?;
887            offset += varint_size;
888            if data.len() < offset + len as usize {
889                return Err(ValueError::TruncatedData);
890            }
891            let bytes = data[offset..offset + len as usize].to_vec();
892            offset += len as usize;
893            Value::Secret(bytes)
894        }
895        DataType::Password => {
896            let (len, varint_size) = read_varint(&data[offset..])?;
897            offset += varint_size;
898            if data.len() < offset + len as usize {
899                return Err(ValueError::TruncatedData);
900            }
901            let hash = String::from_utf8(data[offset..offset + len as usize].to_vec())
902                .map_err(|_| ValueError::InvalidUtf8)?;
903            offset += len as usize;
904            Value::Password(hash)
905        }
906        DataType::Nullable => {
907            // Nullable without inner type means null
908            Value::Null
909        }
910        DataType::Unknown => {
911            // Polymorphic placeholder — never stored on disk.
912            // Reaching here means corrupted data or a bug; treat
913            // as null to stay forward-compatible.
914            Value::Null
915        }
916        // C3 TOAST: zstd-compressed Text — transparent decompression.
917        // Wire: encode writes TextZstd when text > TOAST_THRESHOLD and
918        // compression saves space; decode always materialises as Value::Text.
919        DataType::TextZstd => {
920            let (orig_len, vs1) = read_varint(&data[offset..])?;
921            offset += vs1;
922            let (comp_len, vs2) = read_varint(&data[offset..])?;
923            offset += vs2;
924            if data.len() < offset + comp_len as usize {
925                return Err(ValueError::TruncatedData);
926            }
927            let compressed = &data[offset..offset + comp_len as usize];
928            let mut decompressed = vec![0u8; orig_len as usize];
929            zstd::bulk::decompress_to_buffer(compressed, &mut decompressed)
930                .map_err(|_| ValueError::InvalidUtf8)?;
931            offset += comp_len as usize;
932            let s = String::from_utf8(decompressed).map_err(|_| ValueError::InvalidUtf8)?;
933            Value::text(s)
934        }
935        // C3 TOAST: zstd-compressed Blob — same pattern as TextZstd.
936        DataType::BlobZstd => {
937            let (orig_len, vs1) = read_varint(&data[offset..])?;
938            offset += vs1;
939            let (comp_len, vs2) = read_varint(&data[offset..])?;
940            offset += vs2;
941            if data.len() < offset + comp_len as usize {
942                return Err(ValueError::TruncatedData);
943            }
944            let compressed = &data[offset..offset + comp_len as usize];
945            let mut decompressed = vec![0u8; orig_len as usize];
946            zstd::bulk::decompress_to_buffer(compressed, &mut decompressed)
947                .map_err(|_| ValueError::InvalidUtf8)?;
948            offset += comp_len as usize;
949            Value::Blob(decompressed)
950        }
951    };
952
953    Ok((value, offset))
954}
955
956#[cfg(test)]
957mod tests {
958    use super::*;
959    use proptest::prelude::*;
960    use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
961
962    /// Pinned on-disk byte layout for the canonical [`Value`]
963    /// variants. **If this test breaks, callers with persisted data
964    /// will fail to read older files** — only update the expected
965    /// bytes when you have intentionally migrated the format. A
966    /// silent rewrite is a corruption bug.
967    ///
968    /// Variants pinned: Null, Integer, Text, Boolean, Blob — the
969    /// minimum five required by the codec registry contract.
970    #[test]
971    fn pinned_bytes() {
972        // Null: just the null marker (0x00).
973        let mut buf = Vec::new();
974        encode(&Value::Null, &mut buf);
975        assert_eq!(buf, vec![0x00], "Value::Null layout drifted");
976
977        // Integer(-1): tag (Integer = 1) + i64 little-endian.
978        let mut buf = Vec::new();
979        encode(&Value::Integer(-1), &mut buf);
980        assert_eq!(
981            buf,
982            vec![0x01, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF],
983            "Value::Integer layout drifted"
984        );
985
986        // Text("hi"): tag (Text = 4) + varint(2) + UTF-8 bytes.
987        let mut buf = Vec::new();
988        encode(&Value::text("hi"), &mut buf);
989        assert_eq!(
990            buf,
991            vec![0x04, 0x02, b'h', b'i'],
992            "Value::Text layout drifted"
993        );
994
995        // Boolean(true): tag (Boolean = 6) + 0x01.
996        let mut buf = Vec::new();
997        encode(&Value::Boolean(true), &mut buf);
998        assert_eq!(buf, vec![0x06, 0x01], "Value::Boolean layout drifted");
999
1000        // Blob([0x01, 0x02, 0x03]): tag (Blob = 5) + varint(3) + raw.
1001        let mut buf = Vec::new();
1002        encode(&Value::Blob(vec![0x01, 0x02, 0x03]), &mut buf);
1003        assert_eq!(
1004            buf,
1005            vec![0x05, 0x03, 0x01, 0x02, 0x03],
1006            "Value::Blob layout drifted"
1007        );
1008    }
1009
1010    /// Sanity check that the registry's [`type_tag`] lines up with
1011    /// [`DataType::to_byte`] for every storable variant — this is
1012    /// what guarantees the on-disk tag space stays single-source.
1013    #[test]
1014    fn type_tag_matches_data_type_byte() {
1015        let samples: &[Value] = &[
1016            Value::Null,
1017            Value::Integer(0),
1018            Value::UnsignedInteger(0),
1019            Value::Float(0.0),
1020            Value::text(""),
1021            Value::Blob(Vec::new()),
1022            Value::Boolean(false),
1023            Value::Timestamp(0),
1024            Value::Duration(0),
1025            Value::Uuid([0; 16]),
1026        ];
1027        for v in samples {
1028            let tag = type_tag(v);
1029            if matches!(v, Value::Null) {
1030                assert_eq!(tag, 0);
1031            } else {
1032                assert_eq!(tag, v.data_type().to_byte());
1033                let kind = type_for_tag(tag).expect("registered tag");
1034                assert_eq!(kind, v.data_type());
1035            }
1036        }
1037    }
1038
1039    /// Decoder must reject a type byte it does not recognise rather
1040    /// than silently returning a default. Guards against on-disk
1041    /// corruption being interpreted as a valid value.
1042    #[test]
1043    fn rejects_unknown_type_tag() {
1044        // 0xFF is outside the registered DataType range.
1045        let buf = [0xFFu8];
1046        let err = decode(&buf).expect_err("unknown tag must error");
1047        assert!(matches!(err, ValueError::InvalidType(0xFF)));
1048    }
1049
1050    /// A buffer truncated mid-payload must surface as
1051    /// `TruncatedData`, not panic on a slice index. Covers the
1052    /// fixed-width and length-prefixed code paths.
1053    #[test]
1054    fn rejects_truncated_buffer() {
1055        // Empty buffer.
1056        assert!(matches!(decode(&[]), Err(ValueError::EmptyData)));
1057
1058        // Integer tag (0x01) needs 8 payload bytes; supply 3.
1059        let mut buf = vec![DataType::Integer.to_byte()];
1060        buf.extend_from_slice(&[0x01, 0x02, 0x03]);
1061        assert!(matches!(decode(&buf), Err(ValueError::TruncatedData)));
1062
1063        // Text tag (0x04) with varint len=5 but only 2 payload bytes.
1064        let mut buf = vec![DataType::Text.to_byte()];
1065        write_varint(&mut buf, 5);
1066        buf.extend_from_slice(b"ab");
1067        assert!(matches!(decode(&buf), Err(ValueError::TruncatedData)));
1068    }
1069
1070    #[test]
1071    fn type_for_tag_handles_null_registered_and_unknown_tags() {
1072        assert_eq!(type_for_tag(0), Some(DataType::Nullable));
1073        assert_eq!(
1074            type_for_tag(DataType::Money.to_byte()),
1075            Some(DataType::Money)
1076        );
1077        assert_eq!(type_for_tag(0xFF), None);
1078    }
1079
1080    #[test]
1081    fn round_trip_every_value_variant_directly_through_registry() {
1082        let values = vec![
1083            Value::Null,
1084            Value::Integer(-1),
1085            Value::UnsignedInteger(2),
1086            Value::Float(3.5),
1087            Value::text("hello"),
1088            Value::Blob(vec![1, 2, 3]),
1089            Value::Boolean(true),
1090            Value::Timestamp(4),
1091            Value::Duration(5),
1092            Value::IpAddr(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))),
1093            Value::IpAddr(IpAddr::V6(Ipv6Addr::LOCALHOST)),
1094            Value::MacAddr([1, 2, 3, 4, 5, 6]),
1095            Value::Vector(vec![1.0, 2.0]),
1096            Value::Json(br#"{"ok":true}"#.to_vec()),
1097            Value::Uuid([7; 16]),
1098            Value::NodeRef("node".to_string()),
1099            Value::EdgeRef("edge".to_string()),
1100            Value::VectorRef("vectors".to_string(), 8),
1101            Value::RowRef("rows".to_string(), 9),
1102            Value::Color([0xAA, 0xBB, 0xCC]),
1103            Value::Email("a@example.com".to_string()),
1104            Value::Url("https://example.com".to_string()),
1105            Value::Phone(5511999),
1106            Value::Semver(1_002_003),
1107            Value::Cidr(10 << 24, 8),
1108            Value::Date(20_000),
1109            Value::Time(43_200_000),
1110            Value::Decimal(123_456),
1111            Value::EnumValue(3),
1112            Value::Array(vec![Value::Integer(1), Value::text("two")]),
1113            Value::TimestampMs(123_456),
1114            Value::Ipv4(0x7f000001),
1115            Value::Ipv6([1; 16]),
1116            Value::Subnet(10 << 24, 0xff000000),
1117            Value::Port(5432),
1118            Value::Latitude(-23_550_520),
1119            Value::Longitude(-46_633_308),
1120            Value::GeoPoint(-23_550_520, -46_633_308),
1121            Value::Country2(*b"BR"),
1122            Value::Country3(*b"BRA"),
1123            Value::Lang2(*b"pt"),
1124            Value::Lang5(*b"pt-BR"),
1125            Value::Currency(*b"USD"),
1126            Value::AssetCode("BTC".to_string()),
1127            Value::Money {
1128                asset_code: "USD".to_string(),
1129                minor_units: 1234,
1130                scale: 2,
1131            },
1132            Value::ColorAlpha([1, 2, 3, 4]),
1133            Value::BigInt(-10),
1134            Value::KeyRef("kv".to_string(), "key".to_string()),
1135            Value::DocRef("docs".to_string(), 42),
1136            Value::TableRef("users".to_string()),
1137            Value::PageRef(99),
1138            Value::Secret(vec![9, 8, 7]),
1139            Value::Password("$argon2id$v=19$hash".to_string()),
1140        ];
1141
1142        for original in values {
1143            let mut bytes = Vec::new();
1144            encode(&original, &mut bytes);
1145            let (decoded, consumed) = decode(&bytes).expect("decode");
1146            assert_eq!(consumed, bytes.len());
1147            assert_eq!(decoded, original, "{bytes:?}");
1148        }
1149    }
1150
1151    #[test]
1152    fn compressed_text_and_blob_decode_to_plain_values() {
1153        let text = Value::text("reddb ".repeat(700));
1154        let mut bytes = Vec::new();
1155        encode(&text, &mut bytes);
1156        assert_eq!(bytes[0], DataType::TextZstd.to_byte());
1157        let (decoded, consumed) = decode(&bytes).unwrap();
1158        assert_eq!(consumed, bytes.len());
1159        assert_eq!(decoded, text);
1160
1161        let blob = Value::Blob(vec![0xAB; TOAST_THRESHOLD + 512]);
1162        let mut bytes = Vec::new();
1163        encode(&blob, &mut bytes);
1164        assert_eq!(bytes[0], DataType::BlobZstd.to_byte());
1165        let (decoded, consumed) = decode(&bytes).unwrap();
1166        assert_eq!(consumed, bytes.len());
1167        assert_eq!(decoded, blob);
1168    }
1169
1170    #[test]
1171    fn decode_rejects_short_payload_for_registered_tags() {
1172        let truncated_tags = [
1173            DataType::Integer,
1174            DataType::UnsignedInteger,
1175            DataType::Float,
1176            DataType::Text,
1177            DataType::Blob,
1178            DataType::Boolean,
1179            DataType::Timestamp,
1180            DataType::Duration,
1181            DataType::IpAddr,
1182            DataType::MacAddr,
1183            DataType::Vector,
1184            DataType::Json,
1185            DataType::Uuid,
1186            DataType::NodeRef,
1187            DataType::EdgeRef,
1188            DataType::VectorRef,
1189            DataType::RowRef,
1190            DataType::Color,
1191            DataType::Email,
1192            DataType::Url,
1193            DataType::Phone,
1194            DataType::Semver,
1195            DataType::Cidr,
1196            DataType::Date,
1197            DataType::Time,
1198            DataType::Decimal,
1199            DataType::Enum,
1200            DataType::Array,
1201            DataType::TimestampMs,
1202            DataType::Ipv4,
1203            DataType::Ipv6,
1204            DataType::Subnet,
1205            DataType::Port,
1206            DataType::Latitude,
1207            DataType::Longitude,
1208            DataType::GeoPoint,
1209            DataType::Country2,
1210            DataType::Country3,
1211            DataType::Lang2,
1212            DataType::Lang5,
1213            DataType::Currency,
1214            DataType::AssetCode,
1215            DataType::Money,
1216            DataType::ColorAlpha,
1217            DataType::BigInt,
1218            DataType::KeyRef,
1219            DataType::DocRef,
1220            DataType::TableRef,
1221            DataType::PageRef,
1222            DataType::Secret,
1223            DataType::Password,
1224            DataType::TextZstd,
1225            DataType::BlobZstd,
1226        ];
1227
1228        for data_type in truncated_tags {
1229            let err = decode(&[data_type.to_byte()]).expect_err("short payload must error");
1230            assert_eq!(err, ValueError::TruncatedData, "{data_type:?}");
1231        }
1232
1233        assert_eq!(
1234            decode(&[DataType::Nullable.to_byte()]).unwrap().0,
1235            Value::Null
1236        );
1237    }
1238
1239    #[test]
1240    fn decode_rejects_invalid_embedded_tags_and_utf8_payloads() {
1241        assert_eq!(
1242            decode(&[DataType::IpAddr.to_byte(), 5]).expect_err("bad ip version"),
1243            ValueError::InvalidIpVersion(5)
1244        );
1245
1246        let invalid_text = [DataType::Text.to_byte(), 1, 0xff];
1247        assert_eq!(
1248            decode(&invalid_text).expect_err("invalid utf8"),
1249            ValueError::InvalidUtf8
1250        );
1251
1252        let invalid_email = [DataType::Email.to_byte(), 1, 0xff];
1253        assert_eq!(
1254            decode(&invalid_email).expect_err("invalid utf8"),
1255            ValueError::InvalidUtf8
1256        );
1257
1258        let invalid_url = [DataType::Url.to_byte(), 1, 0xff];
1259        assert_eq!(
1260            decode(&invalid_url).expect_err("invalid utf8"),
1261            ValueError::InvalidUtf8
1262        );
1263
1264        let invalid_asset = [DataType::AssetCode.to_byte(), 1, 0xff];
1265        assert_eq!(
1266            decode(&invalid_asset).expect_err("invalid utf8"),
1267            ValueError::InvalidUtf8
1268        );
1269    }
1270
1271    /// Round-trip: encode then decode must recover the original
1272    /// value, byte for byte.
1273    #[test]
1274    fn round_trip_canonical_variants() {
1275        let cases = vec![
1276            Value::Null,
1277            Value::Integer(-12345),
1278            Value::text("hello"),
1279            Value::Boolean(true),
1280            Value::Blob(vec![1, 2, 3, 4, 5]),
1281        ];
1282        for original in cases {
1283            let mut bytes = Vec::new();
1284            encode(&original, &mut bytes);
1285            let (recovered, consumed) = decode(&bytes).expect("decode");
1286            assert_eq!(consumed, bytes.len());
1287            assert_eq!(original, recovered);
1288        }
1289    }
1290
1291    fn value_variant_strategy() -> impl Strategy<Value = Value> {
1292        prop_oneof![
1293            Just(Value::Null),
1294            any::<bool>().prop_map(Value::Boolean),
1295            any::<i64>().prop_map(Value::Integer),
1296            prop_oneof![
1297                any::<f64>(),
1298                Just(f64::NAN),
1299                Just(f64::INFINITY),
1300                Just(f64::NEG_INFINITY),
1301                Just(f64::MIN_POSITIVE),
1302                Just(f64::from_bits(1)),
1303            ]
1304            .prop_map(Value::Float),
1305            proptest::collection::vec(any::<u8>(), 0..4096).prop_map(Value::Blob),
1306            prop_oneof![
1307                Just(br#"{"a":null,"b":[1,true]}"#.to_vec()),
1308                Just(br#"[]"#.to_vec()),
1309                Just(br#"{"deep":{"nest":{"leaf":[false]}}}"#.to_vec()),
1310            ]
1311            .prop_map(Value::Json),
1312            any::<i64>().prop_map(Value::Timestamp),
1313            any::<[u8; 16]>().prop_map(Value::Uuid),
1314        ]
1315    }
1316
1317    proptest! {
1318        #![proptest_config(ProptestConfig::with_cases(256))]
1319
1320        #[test]
1321        fn prop_value_codec_round_trips_remaining_variants(original in value_variant_strategy()) {
1322            let mut bytes = Vec::new();
1323            encode(&original, &mut bytes);
1324            let (recovered, consumed) = decode(&bytes).expect("decode");
1325            prop_assert_eq!(consumed, bytes.len());
1326            prop_assert!(
1327                values_equivalent(&recovered, &original),
1328                "recovered={recovered:?} original={original:?}"
1329            );
1330        }
1331    }
1332
1333    fn values_equivalent(left: &Value, right: &Value) -> bool {
1334        match (left, right) {
1335            (Value::Float(a), Value::Float(b)) => a.to_bits() == b.to_bits(),
1336            _ => left == right,
1337        }
1338    }
1339}