Skip to main content

reddb_server/storage/schema/
value_codec.rs

1//! On-disk codec registry for [`Value`].
2//!
3//! This module is the **single source of truth** for the byte layout
4//! of every [`Value`] variant. Adding a new variant means:
5//!
6//! 1. Add the variant to [`Value`].
7//! 2. Add the matching [`DataType`] tag (the on-disk type byte).
8//! 3. Add an arm to [`encode`] and [`decode`] in this file.
9//!
10//! That's it — no other file needs to learn the layout. The inherent
11//! [`Value::to_bytes`] / [`Value::from_bytes`] methods stay as the
12//! public API, but they only delegate here.
13//!
14//! ## Why a registry
15//!
16//! Before this module the encode / decode arms lived inside
17//! `types.rs`, mixed with display / coercion / hashing logic. A
18//! parallel `value_type_tag` helper in `storage::query` carried a
19//! third numbering scheme. The result was that every new variant
20//! required edits in three or more places and the tag spaces were
21//! free to drift.
22//!
23//! With the registry there is exactly one mapping
24//! `Value <-> on-disk bytes`. The wire protocol keeps its own,
25//! independent `VAL_*` tag space (see `wire/protocol.rs`); the two
26//! were never identical and any future unification is out of scope.
27//!
28//! ## On-disk format
29//!
30//! Bytes are unchanged versus the previous in-place implementation.
31//! The pinned-byte regression test [`tests::pinned_bytes`] guards
32//! the layout for the canonical variants (Null, Integer, Text, Bool,
33//! Blob).
34
35use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
36
37use super::types::{read_varint, write_varint, DataType, Value, ValueError};
38
39/// Alias kept for callers that prefer the registry's own name. The
40/// on-disk tag space is owned by [`DataType`]; `ValueKind` reads
41/// better in registry contexts where the type name is the schema
42/// label rather than a parser concept.
43pub type ValueKind = DataType;
44
45/// On-disk tag byte for a value.
46///
47/// `Value::Null` uses tag `0` (the same byte the legacy code reserved
48/// as the explicit null marker before the [`DataType`] enum existed).
49/// Every other variant returns `data_type().to_byte()`.
50#[inline]
51pub fn type_tag(value: &Value) -> u8 {
52    match value {
53        Value::Null => 0,
54        other => other.data_type().to_byte(),
55    }
56}
57
58/// Reverse lookup for [`type_tag`]. Returns `None` for unknown bytes;
59/// `Some(DataType::Nullable)` for the dedicated null marker `0`.
60#[inline]
61pub fn type_for_tag(tag: u8) -> Option<ValueKind> {
62    if tag == 0 {
63        Some(DataType::Nullable)
64    } else {
65        DataType::from_byte(tag)
66    }
67}
68
69/// C3 TOAST: minimum byte length to attempt zstd compression. Values
70/// shorter than this are stored uncompressed — compression overhead
71/// (~50 ns + header bytes) outweighs savings for small values.
72pub(super) const TOAST_THRESHOLD: usize = 2048;
73
74/// zstd compression level for TOAST values. Level 3 is PG's default
75/// (balanced speed/ratio).
76pub(super) const TOAST_ZSTD_LEVEL: i32 = 3;
77
78/// Encode a value into `out`, appending its on-disk byte sequence.
79///
80/// The first byte is always [`type_tag`] of `value`; the remainder
81/// is the variant-specific payload.
82pub fn encode(value: &Value, out: &mut Vec<u8>) {
83    match value {
84        Value::Null => {
85            out.push(0); // Null marker
86        }
87        Value::Integer(v) => {
88            out.push(DataType::Integer.to_byte());
89            out.extend_from_slice(&v.to_le_bytes());
90        }
91        Value::UnsignedInteger(v) => {
92            out.push(DataType::UnsignedInteger.to_byte());
93            out.extend_from_slice(&v.to_le_bytes());
94        }
95        Value::Float(v) => {
96            out.push(DataType::Float.to_byte());
97            out.extend_from_slice(&v.to_le_bytes());
98        }
99        Value::Text(s) => {
100            let bytes = s.as_bytes();
101            // C3 TOAST: compress text values larger than the threshold.
102            // Stores with `TextZstd` type byte when compression wins;
103            // falls back to plain `Text` for small values or when zstd
104            // doesn't reduce the size (e.g. already-compressed content).
105            if bytes.len() > TOAST_THRESHOLD {
106                if let Ok(compressed) = zstd::bulk::compress(bytes, TOAST_ZSTD_LEVEL) {
107                    if compressed.len() < bytes.len() {
108                        out.push(DataType::TextZstd.to_byte());
109                        // original length first (needed to pre-allocate decompression buffer)
110                        write_varint(out, bytes.len() as u64);
111                        write_varint(out, compressed.len() as u64);
112                        out.extend_from_slice(&compressed);
113                        return;
114                    }
115                }
116            }
117            out.push(DataType::Text.to_byte());
118            write_varint(out, bytes.len() as u64);
119            out.extend_from_slice(bytes);
120        }
121        Value::Blob(data) => {
122            // C3 TOAST: same pattern as Text.
123            if data.len() > TOAST_THRESHOLD {
124                if let Ok(compressed) = zstd::bulk::compress(data, TOAST_ZSTD_LEVEL) {
125                    if compressed.len() < data.len() {
126                        out.push(DataType::BlobZstd.to_byte());
127                        write_varint(out, data.len() as u64);
128                        write_varint(out, compressed.len() as u64);
129                        out.extend_from_slice(&compressed);
130                        return;
131                    }
132                }
133            }
134            out.push(DataType::Blob.to_byte());
135            write_varint(out, data.len() as u64);
136            out.extend_from_slice(data);
137        }
138        Value::Boolean(v) => {
139            out.push(DataType::Boolean.to_byte());
140            out.push(if *v { 1 } else { 0 });
141        }
142        Value::Timestamp(v) => {
143            out.push(DataType::Timestamp.to_byte());
144            out.extend_from_slice(&v.to_le_bytes());
145        }
146        Value::Duration(v) => {
147            out.push(DataType::Duration.to_byte());
148            out.extend_from_slice(&v.to_le_bytes());
149        }
150        Value::IpAddr(addr) => {
151            out.push(DataType::IpAddr.to_byte());
152            match addr {
153                IpAddr::V4(v4) => {
154                    out.push(4); // IPv4 marker
155                    out.extend_from_slice(&v4.octets());
156                }
157                IpAddr::V6(v6) => {
158                    out.push(6); // IPv6 marker
159                    out.extend_from_slice(&v6.octets());
160                }
161            }
162        }
163        Value::MacAddr(mac) => {
164            out.push(DataType::MacAddr.to_byte());
165            out.extend_from_slice(mac);
166        }
167        Value::Vector(vec) => {
168            out.push(DataType::Vector.to_byte());
169            write_varint(out, vec.len() as u64);
170            for v in vec {
171                out.extend_from_slice(&v.to_le_bytes());
172            }
173        }
174        Value::Json(data) => {
175            out.push(DataType::Json.to_byte());
176            write_varint(out, data.len() as u64);
177            out.extend_from_slice(data);
178        }
179        Value::Uuid(uuid) => {
180            out.push(DataType::Uuid.to_byte());
181            out.extend_from_slice(uuid);
182        }
183        Value::NodeRef(node_id) => {
184            out.push(DataType::NodeRef.to_byte());
185            let bytes = node_id.as_bytes();
186            write_varint(out, bytes.len() as u64);
187            out.extend_from_slice(bytes);
188        }
189        Value::EdgeRef(edge_id) => {
190            out.push(DataType::EdgeRef.to_byte());
191            let bytes = edge_id.as_bytes();
192            write_varint(out, bytes.len() as u64);
193            out.extend_from_slice(bytes);
194        }
195        Value::VectorRef(collection, vector_id) => {
196            out.push(DataType::VectorRef.to_byte());
197            let coll_bytes = collection.as_bytes();
198            write_varint(out, coll_bytes.len() as u64);
199            out.extend_from_slice(coll_bytes);
200            out.extend_from_slice(&vector_id.to_le_bytes());
201        }
202        Value::RowRef(table, row_id) => {
203            out.push(DataType::RowRef.to_byte());
204            let table_bytes = table.as_bytes();
205            write_varint(out, table_bytes.len() as u64);
206            out.extend_from_slice(table_bytes);
207            out.extend_from_slice(&row_id.to_le_bytes());
208        }
209        Value::Color(rgb) => {
210            out.push(DataType::Color.to_byte());
211            out.extend_from_slice(rgb);
212        }
213        Value::Email(s) => {
214            out.push(DataType::Email.to_byte());
215            let bytes = s.as_bytes();
216            write_varint(out, bytes.len() as u64);
217            out.extend_from_slice(bytes);
218        }
219        Value::Url(s) => {
220            out.push(DataType::Url.to_byte());
221            let bytes = s.as_bytes();
222            write_varint(out, bytes.len() as u64);
223            out.extend_from_slice(bytes);
224        }
225        Value::Phone(n) => {
226            out.push(DataType::Phone.to_byte());
227            out.extend_from_slice(&n.to_le_bytes());
228        }
229        Value::Semver(packed) => {
230            out.push(DataType::Semver.to_byte());
231            out.extend_from_slice(&packed.to_le_bytes());
232        }
233        Value::Cidr(ip, prefix) => {
234            out.push(DataType::Cidr.to_byte());
235            out.extend_from_slice(&ip.to_le_bytes());
236            out.push(*prefix);
237        }
238        Value::Date(days) => {
239            out.push(DataType::Date.to_byte());
240            out.extend_from_slice(&days.to_le_bytes());
241        }
242        Value::Time(ms) => {
243            out.push(DataType::Time.to_byte());
244            out.extend_from_slice(&ms.to_le_bytes());
245        }
246        Value::Decimal(v) => {
247            out.push(DataType::Decimal.to_byte());
248            out.extend_from_slice(&v.to_le_bytes());
249        }
250        Value::EnumValue(idx) => {
251            out.push(DataType::Enum.to_byte());
252            out.push(*idx);
253        }
254        Value::Array(elements) => {
255            out.push(DataType::Array.to_byte());
256            write_varint(out, elements.len() as u64);
257            for elem in elements {
258                encode(elem, out);
259            }
260        }
261        Value::TimestampMs(v) => {
262            out.push(DataType::TimestampMs.to_byte());
263            out.extend_from_slice(&v.to_le_bytes());
264        }
265        Value::Ipv4(v) => {
266            out.push(DataType::Ipv4.to_byte());
267            out.extend_from_slice(&v.to_le_bytes());
268        }
269        Value::Ipv6(bytes) => {
270            out.push(DataType::Ipv6.to_byte());
271            out.extend_from_slice(bytes);
272        }
273        Value::Subnet(ip, mask) => {
274            out.push(DataType::Subnet.to_byte());
275            out.extend_from_slice(&ip.to_le_bytes());
276            out.extend_from_slice(&mask.to_le_bytes());
277        }
278        Value::Port(v) => {
279            out.push(DataType::Port.to_byte());
280            out.extend_from_slice(&v.to_le_bytes());
281        }
282        Value::Latitude(v) => {
283            out.push(DataType::Latitude.to_byte());
284            out.extend_from_slice(&v.to_le_bytes());
285        }
286        Value::Longitude(v) => {
287            out.push(DataType::Longitude.to_byte());
288            out.extend_from_slice(&v.to_le_bytes());
289        }
290        Value::GeoPoint(lat, lon) => {
291            out.push(DataType::GeoPoint.to_byte());
292            out.extend_from_slice(&lat.to_le_bytes());
293            out.extend_from_slice(&lon.to_le_bytes());
294        }
295        Value::Country2(c) => {
296            out.push(DataType::Country2.to_byte());
297            out.extend_from_slice(c);
298        }
299        Value::Country3(c) => {
300            out.push(DataType::Country3.to_byte());
301            out.extend_from_slice(c);
302        }
303        Value::Lang2(c) => {
304            out.push(DataType::Lang2.to_byte());
305            out.extend_from_slice(c);
306        }
307        Value::Lang5(c) => {
308            out.push(DataType::Lang5.to_byte());
309            out.extend_from_slice(c);
310        }
311        Value::Currency(c) => {
312            out.push(DataType::Currency.to_byte());
313            out.extend_from_slice(c);
314        }
315        Value::AssetCode(code) => {
316            out.push(DataType::AssetCode.to_byte());
317            let bytes = code.as_bytes();
318            write_varint(out, bytes.len() as u64);
319            out.extend_from_slice(bytes);
320        }
321        Value::Money {
322            asset_code,
323            minor_units,
324            scale,
325        } => {
326            out.push(DataType::Money.to_byte());
327            let bytes = asset_code.as_bytes();
328            write_varint(out, bytes.len() as u64);
329            out.extend_from_slice(bytes);
330            out.push(*scale);
331            out.extend_from_slice(&minor_units.to_le_bytes());
332        }
333        Value::ColorAlpha(rgba) => {
334            out.push(DataType::ColorAlpha.to_byte());
335            out.extend_from_slice(rgba);
336        }
337        Value::BigInt(v) => {
338            out.push(DataType::BigInt.to_byte());
339            out.extend_from_slice(&v.to_le_bytes());
340        }
341        Value::KeyRef(col, key) => {
342            out.push(DataType::KeyRef.to_byte());
343            let col_bytes = col.as_bytes();
344            write_varint(out, col_bytes.len() as u64);
345            out.extend_from_slice(col_bytes);
346            let key_bytes = key.as_bytes();
347            write_varint(out, key_bytes.len() as u64);
348            out.extend_from_slice(key_bytes);
349        }
350        Value::DocRef(col, id) => {
351            out.push(DataType::DocRef.to_byte());
352            let col_bytes = col.as_bytes();
353            write_varint(out, col_bytes.len() as u64);
354            out.extend_from_slice(col_bytes);
355            out.extend_from_slice(&id.to_le_bytes());
356        }
357        Value::TableRef(name) => {
358            out.push(DataType::TableRef.to_byte());
359            let name_bytes = name.as_bytes();
360            write_varint(out, name_bytes.len() as u64);
361            out.extend_from_slice(name_bytes);
362        }
363        Value::PageRef(page_id) => {
364            out.push(DataType::PageRef.to_byte());
365            out.extend_from_slice(&page_id.to_le_bytes());
366        }
367        Value::Secret(bytes) => {
368            out.push(DataType::Secret.to_byte());
369            write_varint(out, bytes.len() as u64);
370            out.extend_from_slice(bytes);
371        }
372        Value::Password(hash) => {
373            out.push(DataType::Password.to_byte());
374            let bytes = hash.as_bytes();
375            write_varint(out, bytes.len() as u64);
376            out.extend_from_slice(bytes);
377        }
378    }
379}
380
381/// Decode a single value from `data`, returning the value and the
382/// number of bytes consumed.
383pub fn decode(data: &[u8]) -> Result<(Value, usize), ValueError> {
384    if data.is_empty() {
385        return Err(ValueError::EmptyData);
386    }
387
388    let type_byte = data[0];
389    let mut offset = 1;
390
391    // Null marker
392    if type_byte == 0 {
393        return Ok((Value::Null, 1));
394    }
395
396    let data_type = DataType::from_byte(type_byte).ok_or(ValueError::InvalidType(type_byte))?;
397
398    let value = match data_type {
399        DataType::Integer => {
400            if data.len() < offset + 8 {
401                return Err(ValueError::TruncatedData);
402            }
403            let v = i64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
404            offset += 8;
405            Value::Integer(v)
406        }
407        DataType::UnsignedInteger => {
408            if data.len() < offset + 8 {
409                return Err(ValueError::TruncatedData);
410            }
411            let v = u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
412            offset += 8;
413            Value::UnsignedInteger(v)
414        }
415        DataType::Float => {
416            if data.len() < offset + 8 {
417                return Err(ValueError::TruncatedData);
418            }
419            let v = f64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
420            offset += 8;
421            Value::Float(v)
422        }
423        DataType::Text => {
424            let (len, varint_size) = read_varint(&data[offset..])?;
425            offset += varint_size;
426            if data.len() < offset + len as usize {
427                return Err(ValueError::TruncatedData);
428            }
429            let s = String::from_utf8(data[offset..offset + len as usize].to_vec())
430                .map_err(|_| ValueError::InvalidUtf8)?;
431            offset += len as usize;
432            Value::text(s)
433        }
434        DataType::Blob => {
435            let (len, varint_size) = read_varint(&data[offset..])?;
436            offset += varint_size;
437            if data.len() < offset + len as usize {
438                return Err(ValueError::TruncatedData);
439            }
440            let blob = data[offset..offset + len as usize].to_vec();
441            offset += len as usize;
442            Value::Blob(blob)
443        }
444        DataType::Boolean => {
445            if data.len() < offset + 1 {
446                return Err(ValueError::TruncatedData);
447            }
448            let v = data[offset] != 0;
449            offset += 1;
450            Value::Boolean(v)
451        }
452        DataType::Timestamp => {
453            if data.len() < offset + 8 {
454                return Err(ValueError::TruncatedData);
455            }
456            let v = i64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
457            offset += 8;
458            Value::Timestamp(v)
459        }
460        DataType::Duration => {
461            if data.len() < offset + 8 {
462                return Err(ValueError::TruncatedData);
463            }
464            let v = i64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
465            offset += 8;
466            Value::Duration(v)
467        }
468        DataType::IpAddr => {
469            if data.len() < offset + 1 {
470                return Err(ValueError::TruncatedData);
471            }
472            let version = data[offset];
473            offset += 1;
474            match version {
475                4 => {
476                    if data.len() < offset + 4 {
477                        return Err(ValueError::TruncatedData);
478                    }
479                    let octets: [u8; 4] = data[offset..offset + 4].try_into().unwrap();
480                    offset += 4;
481                    Value::IpAddr(IpAddr::V4(Ipv4Addr::from(octets)))
482                }
483                6 => {
484                    if data.len() < offset + 16 {
485                        return Err(ValueError::TruncatedData);
486                    }
487                    let octets: [u8; 16] = data[offset..offset + 16].try_into().unwrap();
488                    offset += 16;
489                    Value::IpAddr(IpAddr::V6(Ipv6Addr::from(octets)))
490                }
491                _ => return Err(ValueError::InvalidIpVersion(version)),
492            }
493        }
494        DataType::MacAddr => {
495            if data.len() < offset + 6 {
496                return Err(ValueError::TruncatedData);
497            }
498            let mac: [u8; 6] = data[offset..offset + 6].try_into().unwrap();
499            offset += 6;
500            Value::MacAddr(mac)
501        }
502        DataType::Vector => {
503            let (len, varint_size) = read_varint(&data[offset..])?;
504            offset += varint_size;
505            let float_count = len as usize;
506            if data.len() < offset + float_count * 4 {
507                return Err(ValueError::TruncatedData);
508            }
509            let mut vec = Vec::with_capacity(float_count);
510            for _ in 0..float_count {
511                let v = f32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
512                offset += 4;
513                vec.push(v);
514            }
515            Value::Vector(vec)
516        }
517        DataType::Json => {
518            let (len, varint_size) = read_varint(&data[offset..])?;
519            offset += varint_size;
520            if data.len() < offset + len as usize {
521                return Err(ValueError::TruncatedData);
522            }
523            let json = data[offset..offset + len as usize].to_vec();
524            offset += len as usize;
525            Value::Json(json)
526        }
527        DataType::Uuid => {
528            if data.len() < offset + 16 {
529                return Err(ValueError::TruncatedData);
530            }
531            let uuid: [u8; 16] = data[offset..offset + 16].try_into().unwrap();
532            offset += 16;
533            Value::Uuid(uuid)
534        }
535        DataType::NodeRef => {
536            let (len, len_bytes) = read_varint(&data[offset..])?;
537            offset += len_bytes;
538            if data.len() < offset + len as usize {
539                return Err(ValueError::TruncatedData);
540            }
541            let node_id = String::from_utf8_lossy(&data[offset..offset + len as usize]).to_string();
542            offset += len as usize;
543            Value::NodeRef(node_id)
544        }
545        DataType::EdgeRef => {
546            let (len, len_bytes) = read_varint(&data[offset..])?;
547            offset += len_bytes;
548            if data.len() < offset + len as usize {
549                return Err(ValueError::TruncatedData);
550            }
551            let edge_id = String::from_utf8_lossy(&data[offset..offset + len as usize]).to_string();
552            offset += len as usize;
553            Value::EdgeRef(edge_id)
554        }
555        DataType::VectorRef => {
556            let (len, len_bytes) = read_varint(&data[offset..])?;
557            offset += len_bytes;
558            if data.len() < offset + len as usize + 8 {
559                return Err(ValueError::TruncatedData);
560            }
561            let collection =
562                String::from_utf8_lossy(&data[offset..offset + len as usize]).to_string();
563            offset += len as usize;
564            let vector_id = u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
565            offset += 8;
566            Value::VectorRef(collection, vector_id)
567        }
568        DataType::RowRef => {
569            let (len, len_bytes) = read_varint(&data[offset..])?;
570            offset += len_bytes;
571            if data.len() < offset + len as usize + 8 {
572                return Err(ValueError::TruncatedData);
573            }
574            let table = String::from_utf8_lossy(&data[offset..offset + len as usize]).to_string();
575            offset += len as usize;
576            let row_id = u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
577            offset += 8;
578            Value::RowRef(table, row_id)
579        }
580        DataType::Color => {
581            if data.len() < offset + 3 {
582                return Err(ValueError::TruncatedData);
583            }
584            let rgb: [u8; 3] = data[offset..offset + 3].try_into().unwrap();
585            offset += 3;
586            Value::Color(rgb)
587        }
588        DataType::Email => {
589            let (len, varint_size) = read_varint(&data[offset..])?;
590            offset += varint_size;
591            if data.len() < offset + len as usize {
592                return Err(ValueError::TruncatedData);
593            }
594            let s = String::from_utf8(data[offset..offset + len as usize].to_vec())
595                .map_err(|_| ValueError::InvalidUtf8)?;
596            offset += len as usize;
597            Value::Email(s)
598        }
599        DataType::Url => {
600            let (len, varint_size) = read_varint(&data[offset..])?;
601            offset += varint_size;
602            if data.len() < offset + len as usize {
603                return Err(ValueError::TruncatedData);
604            }
605            let s = String::from_utf8(data[offset..offset + len as usize].to_vec())
606                .map_err(|_| ValueError::InvalidUtf8)?;
607            offset += len as usize;
608            Value::Url(s)
609        }
610        DataType::Phone => {
611            if data.len() < offset + 8 {
612                return Err(ValueError::TruncatedData);
613            }
614            let v = u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
615            offset += 8;
616            Value::Phone(v)
617        }
618        DataType::Semver => {
619            if data.len() < offset + 4 {
620                return Err(ValueError::TruncatedData);
621            }
622            let v = u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
623            offset += 4;
624            Value::Semver(v)
625        }
626        DataType::Cidr => {
627            if data.len() < offset + 5 {
628                return Err(ValueError::TruncatedData);
629            }
630            let ip = u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
631            offset += 4;
632            let prefix = data[offset];
633            offset += 1;
634            Value::Cidr(ip, prefix)
635        }
636        DataType::Date => {
637            if data.len() < offset + 4 {
638                return Err(ValueError::TruncatedData);
639            }
640            let v = i32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
641            offset += 4;
642            Value::Date(v)
643        }
644        DataType::Time => {
645            if data.len() < offset + 4 {
646                return Err(ValueError::TruncatedData);
647            }
648            let v = u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
649            offset += 4;
650            Value::Time(v)
651        }
652        DataType::Decimal => {
653            if data.len() < offset + 8 {
654                return Err(ValueError::TruncatedData);
655            }
656            let v = i64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
657            offset += 8;
658            Value::Decimal(v)
659        }
660        DataType::Enum => {
661            if data.len() < offset + 1 {
662                return Err(ValueError::TruncatedData);
663            }
664            let idx = data[offset];
665            offset += 1;
666            Value::EnumValue(idx)
667        }
668        DataType::Array => {
669            let (len, varint_size) = read_varint(&data[offset..])?;
670            offset += varint_size;
671            let count = len as usize;
672            let mut elements = Vec::with_capacity(count);
673            for _ in 0..count {
674                let (elem, elem_size) = decode(&data[offset..])?;
675                offset += elem_size;
676                elements.push(elem);
677            }
678            Value::Array(elements)
679        }
680        DataType::TimestampMs => {
681            if data.len() < offset + 8 {
682                return Err(ValueError::TruncatedData);
683            }
684            let v = i64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
685            offset += 8;
686            Value::TimestampMs(v)
687        }
688        DataType::Ipv4 => {
689            if data.len() < offset + 4 {
690                return Err(ValueError::TruncatedData);
691            }
692            let v = u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
693            offset += 4;
694            Value::Ipv4(v)
695        }
696        DataType::Ipv6 => {
697            if data.len() < offset + 16 {
698                return Err(ValueError::TruncatedData);
699            }
700            let bytes: [u8; 16] = data[offset..offset + 16].try_into().unwrap();
701            offset += 16;
702            Value::Ipv6(bytes)
703        }
704        DataType::Subnet => {
705            if data.len() < offset + 8 {
706                return Err(ValueError::TruncatedData);
707            }
708            let ip = u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
709            offset += 4;
710            let mask = u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
711            offset += 4;
712            Value::Subnet(ip, mask)
713        }
714        DataType::Port => {
715            if data.len() < offset + 2 {
716                return Err(ValueError::TruncatedData);
717            }
718            let v = u16::from_le_bytes(data[offset..offset + 2].try_into().unwrap());
719            offset += 2;
720            Value::Port(v)
721        }
722        DataType::Latitude => {
723            if data.len() < offset + 4 {
724                return Err(ValueError::TruncatedData);
725            }
726            let v = i32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
727            offset += 4;
728            Value::Latitude(v)
729        }
730        DataType::Longitude => {
731            if data.len() < offset + 4 {
732                return Err(ValueError::TruncatedData);
733            }
734            let v = i32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
735            offset += 4;
736            Value::Longitude(v)
737        }
738        DataType::GeoPoint => {
739            if data.len() < offset + 8 {
740                return Err(ValueError::TruncatedData);
741            }
742            let lat = i32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
743            offset += 4;
744            let lon = i32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
745            offset += 4;
746            Value::GeoPoint(lat, lon)
747        }
748        DataType::Country2 => {
749            if data.len() < offset + 2 {
750                return Err(ValueError::TruncatedData);
751            }
752            let c: [u8; 2] = data[offset..offset + 2].try_into().unwrap();
753            offset += 2;
754            Value::Country2(c)
755        }
756        DataType::Country3 => {
757            if data.len() < offset + 3 {
758                return Err(ValueError::TruncatedData);
759            }
760            let c: [u8; 3] = data[offset..offset + 3].try_into().unwrap();
761            offset += 3;
762            Value::Country3(c)
763        }
764        DataType::Lang2 => {
765            if data.len() < offset + 2 {
766                return Err(ValueError::TruncatedData);
767            }
768            let c: [u8; 2] = data[offset..offset + 2].try_into().unwrap();
769            offset += 2;
770            Value::Lang2(c)
771        }
772        DataType::Lang5 => {
773            if data.len() < offset + 5 {
774                return Err(ValueError::TruncatedData);
775            }
776            let c: [u8; 5] = data[offset..offset + 5].try_into().unwrap();
777            offset += 5;
778            Value::Lang5(c)
779        }
780        DataType::Currency => {
781            if data.len() < offset + 3 {
782                return Err(ValueError::TruncatedData);
783            }
784            let c: [u8; 3] = data[offset..offset + 3].try_into().unwrap();
785            offset += 3;
786            Value::Currency(c)
787        }
788        DataType::AssetCode => {
789            let (len, len_bytes) = read_varint(&data[offset..])?;
790            offset += len_bytes;
791            if data.len() < offset + len as usize {
792                return Err(ValueError::TruncatedData);
793            }
794            let code = String::from_utf8(data[offset..offset + len as usize].to_vec())
795                .map_err(|_| ValueError::InvalidUtf8)?;
796            offset += len as usize;
797            Value::AssetCode(code)
798        }
799        DataType::Money => {
800            let (len, len_bytes) = read_varint(&data[offset..])?;
801            offset += len_bytes;
802            if data.len() < offset + len as usize + 1 + 8 {
803                return Err(ValueError::TruncatedData);
804            }
805            let asset_code = String::from_utf8(data[offset..offset + len as usize].to_vec())
806                .map_err(|_| ValueError::InvalidUtf8)?;
807            offset += len as usize;
808            let scale = data[offset];
809            offset += 1;
810            let minor_units = i64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
811            offset += 8;
812            Value::Money {
813                asset_code,
814                minor_units,
815                scale,
816            }
817        }
818        DataType::ColorAlpha => {
819            if data.len() < offset + 4 {
820                return Err(ValueError::TruncatedData);
821            }
822            let rgba: [u8; 4] = data[offset..offset + 4].try_into().unwrap();
823            offset += 4;
824            Value::ColorAlpha(rgba)
825        }
826        DataType::BigInt => {
827            if data.len() < offset + 8 {
828                return Err(ValueError::TruncatedData);
829            }
830            let v = i64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
831            offset += 8;
832            Value::BigInt(v)
833        }
834        DataType::KeyRef => {
835            let (col_len, col_varint) = read_varint(&data[offset..])?;
836            offset += col_varint;
837            if data.len() < offset + col_len as usize {
838                return Err(ValueError::TruncatedData);
839            }
840            let col = String::from_utf8(data[offset..offset + col_len as usize].to_vec())
841                .map_err(|_| ValueError::InvalidUtf8)?;
842            offset += col_len as usize;
843            let (key_len, key_varint) = read_varint(&data[offset..])?;
844            offset += key_varint;
845            if data.len() < offset + key_len as usize {
846                return Err(ValueError::TruncatedData);
847            }
848            let key = String::from_utf8(data[offset..offset + key_len as usize].to_vec())
849                .map_err(|_| ValueError::InvalidUtf8)?;
850            offset += key_len as usize;
851            Value::KeyRef(col, key)
852        }
853        DataType::DocRef => {
854            let (col_len, col_varint) = read_varint(&data[offset..])?;
855            offset += col_varint;
856            if data.len() < offset + col_len as usize + 8 {
857                return Err(ValueError::TruncatedData);
858            }
859            let col = String::from_utf8(data[offset..offset + col_len as usize].to_vec())
860                .map_err(|_| ValueError::InvalidUtf8)?;
861            offset += col_len as usize;
862            let id = u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
863            offset += 8;
864            Value::DocRef(col, id)
865        }
866        DataType::TableRef => {
867            let (len, varint_size) = read_varint(&data[offset..])?;
868            offset += varint_size;
869            if data.len() < offset + len as usize {
870                return Err(ValueError::TruncatedData);
871            }
872            let name = String::from_utf8(data[offset..offset + len as usize].to_vec())
873                .map_err(|_| ValueError::InvalidUtf8)?;
874            offset += len as usize;
875            Value::TableRef(name)
876        }
877        DataType::PageRef => {
878            if data.len() < offset + 4 {
879                return Err(ValueError::TruncatedData);
880            }
881            let page_id = u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
882            offset += 4;
883            Value::PageRef(page_id)
884        }
885        DataType::Secret => {
886            let (len, varint_size) = read_varint(&data[offset..])?;
887            offset += varint_size;
888            if data.len() < offset + len as usize {
889                return Err(ValueError::TruncatedData);
890            }
891            let bytes = data[offset..offset + len as usize].to_vec();
892            offset += len as usize;
893            Value::Secret(bytes)
894        }
895        DataType::Password => {
896            let (len, varint_size) = read_varint(&data[offset..])?;
897            offset += varint_size;
898            if data.len() < offset + len as usize {
899                return Err(ValueError::TruncatedData);
900            }
901            let hash = String::from_utf8(data[offset..offset + len as usize].to_vec())
902                .map_err(|_| ValueError::InvalidUtf8)?;
903            offset += len as usize;
904            Value::Password(hash)
905        }
906        DataType::Nullable => {
907            // Nullable without inner type means null
908            Value::Null
909        }
910        DataType::Unknown => {
911            // Polymorphic placeholder — never stored on disk.
912            // Reaching here means corrupted data or a bug; treat
913            // as null to stay forward-compatible.
914            Value::Null
915        }
916        // C3 TOAST: zstd-compressed Text — transparent decompression.
917        // Wire: encode writes TextZstd when text > TOAST_THRESHOLD and
918        // compression saves space; decode always materialises as Value::Text.
919        DataType::TextZstd => {
920            let (orig_len, vs1) = read_varint(&data[offset..])?;
921            offset += vs1;
922            let (comp_len, vs2) = read_varint(&data[offset..])?;
923            offset += vs2;
924            if data.len() < offset + comp_len as usize {
925                return Err(ValueError::TruncatedData);
926            }
927            let compressed = &data[offset..offset + comp_len as usize];
928            let mut decompressed = vec![0u8; orig_len as usize];
929            zstd::bulk::decompress_to_buffer(compressed, &mut decompressed)
930                .map_err(|_| ValueError::InvalidUtf8)?;
931            offset += comp_len as usize;
932            let s = String::from_utf8(decompressed).map_err(|_| ValueError::InvalidUtf8)?;
933            Value::text(s)
934        }
935        // C3 TOAST: zstd-compressed Blob — same pattern as TextZstd.
936        DataType::BlobZstd => {
937            let (orig_len, vs1) = read_varint(&data[offset..])?;
938            offset += vs1;
939            let (comp_len, vs2) = read_varint(&data[offset..])?;
940            offset += vs2;
941            if data.len() < offset + comp_len as usize {
942                return Err(ValueError::TruncatedData);
943            }
944            let compressed = &data[offset..offset + comp_len as usize];
945            let mut decompressed = vec![0u8; orig_len as usize];
946            zstd::bulk::decompress_to_buffer(compressed, &mut decompressed)
947                .map_err(|_| ValueError::InvalidUtf8)?;
948            offset += comp_len as usize;
949            Value::Blob(decompressed)
950        }
951    };
952
953    Ok((value, offset))
954}
955
956#[cfg(test)]
957mod tests {
958    use super::*;
959
960    /// Pinned on-disk byte layout for the canonical [`Value`]
961    /// variants. **If this test breaks, callers with persisted data
962    /// will fail to read older files** — only update the expected
963    /// bytes when you have intentionally migrated the format. A
964    /// silent rewrite is a corruption bug.
965    ///
966    /// Variants pinned: Null, Integer, Text, Boolean, Blob — the
967    /// minimum five required by the codec registry contract.
968    #[test]
969    fn pinned_bytes() {
970        // Null: just the null marker (0x00).
971        let mut buf = Vec::new();
972        encode(&Value::Null, &mut buf);
973        assert_eq!(buf, vec![0x00], "Value::Null layout drifted");
974
975        // Integer(-1): tag (Integer = 1) + i64 little-endian.
976        let mut buf = Vec::new();
977        encode(&Value::Integer(-1), &mut buf);
978        assert_eq!(
979            buf,
980            vec![0x01, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF],
981            "Value::Integer layout drifted"
982        );
983
984        // Text("hi"): tag (Text = 4) + varint(2) + UTF-8 bytes.
985        let mut buf = Vec::new();
986        encode(&Value::text("hi"), &mut buf);
987        assert_eq!(
988            buf,
989            vec![0x04, 0x02, b'h', b'i'],
990            "Value::Text layout drifted"
991        );
992
993        // Boolean(true): tag (Boolean = 6) + 0x01.
994        let mut buf = Vec::new();
995        encode(&Value::Boolean(true), &mut buf);
996        assert_eq!(buf, vec![0x06, 0x01], "Value::Boolean layout drifted");
997
998        // Blob([0x01, 0x02, 0x03]): tag (Blob = 5) + varint(3) + raw.
999        let mut buf = Vec::new();
1000        encode(&Value::Blob(vec![0x01, 0x02, 0x03]), &mut buf);
1001        assert_eq!(
1002            buf,
1003            vec![0x05, 0x03, 0x01, 0x02, 0x03],
1004            "Value::Blob layout drifted"
1005        );
1006    }
1007
1008    /// Sanity check that the registry's [`type_tag`] lines up with
1009    /// [`DataType::to_byte`] for every storable variant — this is
1010    /// what guarantees the on-disk tag space stays single-source.
1011    #[test]
1012    fn type_tag_matches_data_type_byte() {
1013        let samples: &[Value] = &[
1014            Value::Null,
1015            Value::Integer(0),
1016            Value::UnsignedInteger(0),
1017            Value::Float(0.0),
1018            Value::text(""),
1019            Value::Blob(Vec::new()),
1020            Value::Boolean(false),
1021            Value::Timestamp(0),
1022            Value::Duration(0),
1023            Value::Uuid([0; 16]),
1024        ];
1025        for v in samples {
1026            let tag = type_tag(v);
1027            if matches!(v, Value::Null) {
1028                assert_eq!(tag, 0);
1029            } else {
1030                assert_eq!(tag, v.data_type().to_byte());
1031                let kind = type_for_tag(tag).expect("registered tag");
1032                assert_eq!(kind, v.data_type());
1033            }
1034        }
1035    }
1036
1037    /// Decoder must reject a type byte it does not recognise rather
1038    /// than silently returning a default. Guards against on-disk
1039    /// corruption being interpreted as a valid value.
1040    #[test]
1041    fn rejects_unknown_type_tag() {
1042        // 0xFF is outside the registered DataType range.
1043        let buf = [0xFFu8];
1044        let err = decode(&buf).expect_err("unknown tag must error");
1045        assert!(matches!(err, ValueError::InvalidType(0xFF)));
1046    }
1047
1048    /// A buffer truncated mid-payload must surface as
1049    /// `TruncatedData`, not panic on a slice index. Covers the
1050    /// fixed-width and length-prefixed code paths.
1051    #[test]
1052    fn rejects_truncated_buffer() {
1053        // Empty buffer.
1054        assert!(matches!(decode(&[]), Err(ValueError::EmptyData)));
1055
1056        // Integer tag (0x01) needs 8 payload bytes; supply 3.
1057        let mut buf = vec![DataType::Integer.to_byte()];
1058        buf.extend_from_slice(&[0x01, 0x02, 0x03]);
1059        assert!(matches!(decode(&buf), Err(ValueError::TruncatedData)));
1060
1061        // Text tag (0x04) with varint len=5 but only 2 payload bytes.
1062        let mut buf = vec![DataType::Text.to_byte()];
1063        write_varint(&mut buf, 5);
1064        buf.extend_from_slice(b"ab");
1065        assert!(matches!(decode(&buf), Err(ValueError::TruncatedData)));
1066    }
1067
1068    /// Round-trip: encode then decode must recover the original
1069    /// value, byte for byte.
1070    #[test]
1071    fn round_trip_canonical_variants() {
1072        let cases = vec![
1073            Value::Null,
1074            Value::Integer(-12345),
1075            Value::text("hello"),
1076            Value::Boolean(true),
1077            Value::Blob(vec![1, 2, 3, 4, 5]),
1078        ];
1079        for original in cases {
1080            let mut bytes = Vec::new();
1081            encode(&original, &mut bytes);
1082            let (recovered, consumed) = decode(&bytes).expect("decode");
1083            assert_eq!(consumed, bytes.len());
1084            assert_eq!(original, recovered);
1085        }
1086    }
1087}