Skip to main content

modelvault_core/catalog/
codec.rs

1//! Binary encoding for catalog payloads embedded in `SegmentType::Schema` segments.
2
3use std::borrow::Cow;
4
5use crate::error::{DbError, FormatError};
6use crate::schema::{Constraint, FieldDef, FieldPath, IndexDef, IndexKind, Type};
7
8/// Maximum UTF-8 length for a collection name (exclusive upper bound is 1024 bytes).
9pub const MAX_COLLECTION_NAME_BYTES: usize = 1023;
10
11/// Legacy catalog payload (no primary key on create).
12pub const CATALOG_PAYLOAD_VERSION_V1: u16 = 1;
13/// Catalog with optional `primary_field` on create, no per-field constraints.
14pub const CATALOG_PAYLOAD_VERSION_V2: u16 = 2;
15/// Current catalog write version: `primary_field` + [`FieldDef::constraints`].
16pub const CATALOG_PAYLOAD_VERSION_V3: u16 = 3;
17/// Catalog with `indexes` definitions (secondary indexes).
18pub const CATALOG_PAYLOAD_VERSION_V4: u16 = 4;
19/// What [`encode_catalog_payload`] writes (latest).
20pub const CATALOG_PAYLOAD_VERSION: u16 = CATALOG_PAYLOAD_VERSION_V4;
21
22pub const ENTRY_KIND_CREATE_COLLECTION: u16 = 1;
23pub const ENTRY_KIND_NEW_SCHEMA_VERSION: u16 = 2;
24
25/// Maximum nesting depth for `Type` when encoding/decoding (prevents stack overflow on hostile input).
26pub const MAX_TYPE_NESTING_DEPTH: u32 = 32;
27
28/// Maximum field definitions per schema version in a catalog payload.
29pub const MAX_FIELDS_PER_SCHEMA: u32 = 4096;
30
31#[derive(Debug, Clone, PartialEq)]
32pub enum CatalogDecodeError {
33    UnexpectedEof,
34    UnknownCatalogPayloadVersion { got: u16 },
35    UnknownEntryKind { got: u16 },
36    TrailingBytes,
37    TypeNestingTooDeep { max: u32 },
38    InvalidUtf8,
39    CollectionNameTooLong { got: usize },
40    EmptyCollectionName,
41    InvalidCreateSchemaVersion { got: u32 },
42    IndexNameTooLong { got: usize },
43    EmptyIndexName,
44    UnknownIndexKind { got: u8 },
45}
46
47impl From<CatalogDecodeError> for DbError {
48    fn from(e: CatalogDecodeError) -> Self {
49        DbError::Format(FormatError::InvalidCatalogPayload {
50            message: format!("{e:?}"),
51        })
52    }
53}
54
55/// Encode a catalog record as segment payload bytes (current [`CATALOG_PAYLOAD_VERSION`]).
56pub fn encode_catalog_payload(record: &CatalogRecordWire) -> Vec<u8> {
57    let mut out = Vec::new();
58    out.extend_from_slice(&CATALOG_PAYLOAD_VERSION.to_le_bytes());
59    match record {
60        CatalogRecordWire::CreateCollection {
61            collection_id,
62            name,
63            schema_version,
64            fields,
65            indexes,
66            primary_field,
67        } => {
68            out.extend_from_slice(&ENTRY_KIND_CREATE_COLLECTION.to_le_bytes());
69            out.extend_from_slice(&collection_id.to_le_bytes());
70            encode_name(&mut out, name);
71            out.extend_from_slice(&schema_version.to_le_bytes());
72            encode_fields_v3(&mut out, fields);
73            encode_indexes(&mut out, indexes);
74            encode_optional_primary_name(&mut out, primary_field.as_deref());
75        }
76        CatalogRecordWire::NewSchemaVersion {
77            collection_id,
78            schema_version,
79            fields,
80            indexes,
81        } => {
82            out.extend_from_slice(&ENTRY_KIND_NEW_SCHEMA_VERSION.to_le_bytes());
83            out.extend_from_slice(&collection_id.to_le_bytes());
84            out.extend_from_slice(&schema_version.to_le_bytes());
85            encode_fields_v3(&mut out, fields);
86            encode_indexes(&mut out, indexes);
87        }
88    }
89    out
90}
91
92/// Wire representation for encoding (mirrors on-disk entry kinds).
93#[derive(Debug, Clone, PartialEq)]
94pub enum CatalogRecordWire {
95    CreateCollection {
96        collection_id: u32,
97        name: String,
98        schema_version: u32,
99        fields: Vec<FieldDef>,
100        indexes: Vec<IndexDef>,
101        /// Top-level segment name for the primary key (`None` for legacy v1 catalog segments).
102        primary_field: Option<String>,
103    },
104    NewSchemaVersion {
105        collection_id: u32,
106        schema_version: u32,
107        fields: Vec<FieldDef>,
108        indexes: Vec<IndexDef>,
109    },
110}
111
112pub fn decode_catalog_payload(bytes: &[u8]) -> Result<CatalogRecordWire, DbError> {
113    let mut cur = Cursor::new(bytes);
114    let ver = cur.take_u16()?;
115    if ver != CATALOG_PAYLOAD_VERSION_V1
116        && ver != CATALOG_PAYLOAD_VERSION_V2
117        && ver != CATALOG_PAYLOAD_VERSION_V3
118        && ver != CATALOG_PAYLOAD_VERSION_V4
119    {
120        return Err(CatalogDecodeError::UnknownCatalogPayloadVersion { got: ver }.into());
121    }
122    let kind = cur.take_u16()?;
123    match kind {
124        ENTRY_KIND_CREATE_COLLECTION => {
125            let collection_id = cur.take_u32()?;
126            let name = decode_name(&mut cur)?;
127            let schema_version = cur.take_u32()?;
128            let fields = decode_fields(&mut cur, ver)?;
129            let indexes = if ver >= CATALOG_PAYLOAD_VERSION_V4 {
130                decode_indexes(&mut cur)?
131            } else {
132                Vec::new()
133            };
134            let primary_field = if ver >= CATALOG_PAYLOAD_VERSION_V2 {
135                decode_optional_primary_name(&mut cur)?
136            } else {
137                None
138            };
139            if cur.remaining() != 0 {
140                return Err(CatalogDecodeError::TrailingBytes.into());
141            }
142            Ok(CatalogRecordWire::CreateCollection {
143                collection_id,
144                name,
145                schema_version,
146                fields,
147                indexes,
148                primary_field,
149            })
150        }
151        ENTRY_KIND_NEW_SCHEMA_VERSION => {
152            let collection_id = cur.take_u32()?;
153            let schema_version = cur.take_u32()?;
154            let fields = decode_fields(&mut cur, ver)?;
155            let indexes = if ver >= CATALOG_PAYLOAD_VERSION_V4 {
156                decode_indexes(&mut cur)?
157            } else {
158                Vec::new()
159            };
160            if cur.remaining() != 0 {
161                return Err(CatalogDecodeError::TrailingBytes.into());
162            }
163            Ok(CatalogRecordWire::NewSchemaVersion {
164                collection_id,
165                schema_version,
166                fields,
167                indexes,
168            })
169        }
170        _ => Err(CatalogDecodeError::UnknownEntryKind { got: kind }.into()),
171    }
172}
173
174fn encode_optional_primary_name(out: &mut Vec<u8>, primary: Option<&str>) {
175    match primary {
176        None => out.extend_from_slice(&0u32.to_le_bytes()),
177        Some(s) => {
178            let b = s.as_bytes();
179            out.extend_from_slice(&(b.len() as u32).to_le_bytes());
180            out.extend_from_slice(b);
181        }
182    }
183}
184
185fn decode_optional_primary_name(cur: &mut Cursor<'_>) -> Result<Option<String>, DbError> {
186    let n = cur.take_u32()? as usize;
187    if n == 0 {
188        return Ok(None);
189    }
190    if n > MAX_COLLECTION_NAME_BYTES {
191        return Err(CatalogDecodeError::CollectionNameTooLong { got: n }.into());
192    }
193    let bytes = cur.take_bytes(n)?;
194    let s = String::from_utf8(bytes).map_err(|_| CatalogDecodeError::InvalidUtf8)?;
195    Ok(Some(s))
196}
197
198fn encode_name(out: &mut Vec<u8>, name: &str) {
199    let b = name.as_bytes();
200    out.extend_from_slice(&(b.len() as u32).to_le_bytes());
201    out.extend_from_slice(b);
202}
203
204fn decode_name(cur: &mut Cursor<'_>) -> Result<String, DbError> {
205    let n = cur.take_u32()? as usize;
206    if n == 0 {
207        return Err(CatalogDecodeError::EmptyCollectionName.into());
208    }
209    if n > MAX_COLLECTION_NAME_BYTES {
210        return Err(CatalogDecodeError::CollectionNameTooLong { got: n }.into());
211    }
212    let bytes = cur.take_bytes(n)?;
213    String::from_utf8(bytes).map_err(|_| CatalogDecodeError::InvalidUtf8.into())
214}
215
216fn encode_indexes(out: &mut Vec<u8>, indexes: &[IndexDef]) {
217    out.extend_from_slice(&(indexes.len() as u32).to_le_bytes());
218    for idx in indexes {
219        match idx.kind {
220            IndexKind::Unique => out.push(1),
221            IndexKind::NonUnique => out.push(2),
222        }
223        encode_field_path(out, &idx.path);
224        let b = idx.name.as_bytes();
225        out.extend_from_slice(&(b.len() as u32).to_le_bytes());
226        out.extend_from_slice(b);
227    }
228}
229
230fn decode_indexes(cur: &mut Cursor<'_>) -> Result<Vec<IndexDef>, DbError> {
231    let n = cur.take_u32()? as usize;
232    let mut v = Vec::with_capacity(n.min(1024));
233    for _ in 0..n {
234        let kind_tag = cur.take_u8()?;
235        let kind = match kind_tag {
236            1 => IndexKind::Unique,
237            2 => IndexKind::NonUnique,
238            _ => return Err(CatalogDecodeError::UnknownIndexKind { got: kind_tag }.into()),
239        };
240        let path = decode_field_path(cur)?;
241        let name_len = cur.take_u32()? as usize;
242        if name_len == 0 {
243            return Err(CatalogDecodeError::EmptyIndexName.into());
244        }
245        if name_len > MAX_COLLECTION_NAME_BYTES {
246            return Err(CatalogDecodeError::IndexNameTooLong { got: name_len }.into());
247        }
248        let bytes = cur.take_bytes(name_len)?;
249        let name = String::from_utf8(bytes).map_err(|_| CatalogDecodeError::InvalidUtf8)?;
250        v.push(IndexDef { name, path, kind });
251    }
252    Ok(v)
253}
254
255fn encode_fields_v3(out: &mut Vec<u8>, fields: &[FieldDef]) {
256    out.extend_from_slice(&(fields.len() as u32).to_le_bytes());
257    for f in fields {
258        encode_field_path(out, &f.path);
259        encode_type(out, &f.ty, 0);
260        encode_constraints(out, &f.constraints);
261    }
262}
263
264fn decode_fields(cur: &mut Cursor<'_>, catalog_ver: u16) -> Result<Vec<FieldDef>, DbError> {
265    let n = cur.take_u32()?;
266    if n > MAX_FIELDS_PER_SCHEMA {
267        return Err(DbError::Format(FormatError::InvalidCatalogPayload {
268            message: format!("field count {n} exceeds maximum {MAX_FIELDS_PER_SCHEMA}"),
269        }));
270    }
271    let n = n as usize;
272    let mut v = Vec::with_capacity(n.min(1024));
273    for _ in 0..n {
274        let path = decode_field_path(cur)?;
275        let ty = decode_type(cur, 0)?;
276        let constraints = if catalog_ver >= CATALOG_PAYLOAD_VERSION_V3 {
277            decode_constraints(cur)?
278        } else {
279            Vec::new()
280        };
281        v.push(FieldDef {
282            path,
283            ty,
284            constraints,
285        });
286    }
287    Ok(v)
288}
289
290const CT_MIN_I64: u8 = 1;
291const CT_MAX_I64: u8 = 2;
292const CT_MIN_U64: u8 = 3;
293const CT_MAX_U64: u8 = 4;
294const CT_MIN_F64: u8 = 5;
295const CT_MAX_F64: u8 = 6;
296const CT_MIN_LEN: u8 = 7;
297const CT_MAX_LEN: u8 = 8;
298const CT_REGEX: u8 = 9;
299const CT_EMAIL: u8 = 10;
300const CT_URL: u8 = 11;
301const CT_NONEMPTY: u8 = 12;
302
303fn encode_constraints(out: &mut Vec<u8>, c: &[Constraint]) {
304    out.extend_from_slice(&(c.len() as u32).to_le_bytes());
305    for x in c {
306        match x {
307            Constraint::MinI64(n) => {
308                out.push(CT_MIN_I64);
309                out.extend_from_slice(&n.to_le_bytes());
310            }
311            Constraint::MaxI64(n) => {
312                out.push(CT_MAX_I64);
313                out.extend_from_slice(&n.to_le_bytes());
314            }
315            Constraint::MinU64(n) => {
316                out.push(CT_MIN_U64);
317                out.extend_from_slice(&n.to_le_bytes());
318            }
319            Constraint::MaxU64(n) => {
320                out.push(CT_MAX_U64);
321                out.extend_from_slice(&n.to_le_bytes());
322            }
323            Constraint::MinF64(n) => {
324                out.push(CT_MIN_F64);
325                out.extend_from_slice(&n.to_le_bytes());
326            }
327            Constraint::MaxF64(n) => {
328                out.push(CT_MAX_F64);
329                out.extend_from_slice(&n.to_le_bytes());
330            }
331            Constraint::MinLength(n) => {
332                out.push(CT_MIN_LEN);
333                out.extend_from_slice(&n.to_le_bytes());
334            }
335            Constraint::MaxLength(n) => {
336                out.push(CT_MAX_LEN);
337                out.extend_from_slice(&n.to_le_bytes());
338            }
339            Constraint::Regex(s) => {
340                out.push(CT_REGEX);
341                let b = s.as_bytes();
342                out.extend_from_slice(&(b.len() as u32).to_le_bytes());
343                out.extend_from_slice(b);
344            }
345            Constraint::Email => out.push(CT_EMAIL),
346            Constraint::Url => out.push(CT_URL),
347            Constraint::NonEmpty => out.push(CT_NONEMPTY),
348        }
349    }
350}
351
352fn decode_constraints(cur: &mut Cursor<'_>) -> Result<Vec<Constraint>, DbError> {
353    let n = cur.take_u32()? as usize;
354    let mut v = Vec::with_capacity(n.min(4096));
355    for _ in 0..n {
356        let tag = cur.take_u8()?;
357        let c = match tag {
358            CT_MIN_I64 => Constraint::MinI64(cur.take_i64()?),
359            CT_MAX_I64 => Constraint::MaxI64(cur.take_i64()?),
360            CT_MIN_U64 => Constraint::MinU64(cur.take_u64()?),
361            CT_MAX_U64 => Constraint::MaxU64(cur.take_u64()?),
362            CT_MIN_F64 => Constraint::MinF64(f64::from_bits(cur.take_u64()?)),
363            CT_MAX_F64 => Constraint::MaxF64(f64::from_bits(cur.take_u64()?)),
364            CT_MIN_LEN => Constraint::MinLength(cur.take_u64()?),
365            CT_MAX_LEN => Constraint::MaxLength(cur.take_u64()?),
366            CT_REGEX => {
367                let len = cur.take_u32()? as usize;
368                let bytes = cur.take_bytes(len)?;
369                Constraint::Regex(
370                    String::from_utf8(bytes).map_err(|_| CatalogDecodeError::InvalidUtf8)?,
371                )
372            }
373            CT_EMAIL => Constraint::Email,
374            CT_URL => Constraint::Url,
375            CT_NONEMPTY => Constraint::NonEmpty,
376            _ => {
377                return Err(DbError::Format(FormatError::InvalidCatalogPayload {
378                    message: format!("unknown constraint tag {tag}"),
379                }))
380            }
381        };
382        v.push(c);
383    }
384    Ok(v)
385}
386
387fn encode_field_path(out: &mut Vec<u8>, path: &FieldPath) {
388    let parts = &path.0;
389    out.extend_from_slice(&(parts.len() as u32).to_le_bytes());
390    for p in parts {
391        let b = p.as_bytes();
392        out.extend_from_slice(&(b.len() as u32).to_le_bytes());
393        out.extend_from_slice(b);
394    }
395}
396
397fn decode_field_path(cur: &mut Cursor<'_>) -> Result<FieldPath, DbError> {
398    let n = cur.take_u32()? as usize;
399    if n == 0 {
400        return Err(DbError::Schema(crate::error::SchemaError::InvalidFieldPath));
401    }
402    let mut parts = Vec::with_capacity(n.min(64));
403    for _ in 0..n {
404        let len = cur.take_u32()? as usize;
405        let bytes = cur.take_bytes(len)?;
406        let s = String::from_utf8(bytes).map_err(|_| CatalogDecodeError::InvalidUtf8)?;
407        if s.is_empty() {
408            return Err(DbError::Schema(crate::error::SchemaError::InvalidFieldPath));
409        }
410        parts.push(Cow::Owned(s));
411    }
412    Ok(FieldPath(parts))
413}
414
415const TAG_BOOL: u8 = 0;
416const TAG_INT64: u8 = 1;
417const TAG_UINT64: u8 = 2;
418const TAG_FLOAT64: u8 = 3;
419const TAG_STRING: u8 = 4;
420const TAG_BYTES: u8 = 5;
421const TAG_UUID: u8 = 6;
422const TAG_TIMESTAMP: u8 = 7;
423const TAG_OPTIONAL: u8 = 8;
424const TAG_LIST: u8 = 9;
425const TAG_OBJECT: u8 = 10;
426const TAG_ENUM: u8 = 11;
427
428// `depth` is only read when recursing into nested types; clippy does not see cross-call use.
429#[allow(clippy::only_used_in_recursion)]
430fn encode_type(out: &mut Vec<u8>, ty: &Type, depth: u32) {
431    match ty {
432        Type::Bool => out.push(TAG_BOOL),
433        Type::Int64 => out.push(TAG_INT64),
434        Type::Uint64 => out.push(TAG_UINT64),
435        Type::Float64 => out.push(TAG_FLOAT64),
436        Type::String => out.push(TAG_STRING),
437        Type::Bytes => out.push(TAG_BYTES),
438        Type::Uuid => out.push(TAG_UUID),
439        Type::Timestamp => out.push(TAG_TIMESTAMP),
440        Type::Optional(inner) => {
441            out.push(TAG_OPTIONAL);
442            encode_type(out, inner, depth + 1);
443        }
444        Type::List(inner) => {
445            out.push(TAG_LIST);
446            encode_type(out, inner, depth + 1);
447        }
448        Type::Object(fields) => {
449            out.push(TAG_OBJECT);
450            out.extend_from_slice(&(fields.len() as u32).to_le_bytes());
451            for f in fields {
452                encode_field_path(out, &f.path);
453                encode_type(out, &f.ty, depth + 1);
454            }
455        }
456        Type::Enum(variants) => {
457            out.push(TAG_ENUM);
458            out.extend_from_slice(&(variants.len() as u32).to_le_bytes());
459            for s in variants {
460                let b = s.as_bytes();
461                out.extend_from_slice(&(b.len() as u32).to_le_bytes());
462                out.extend_from_slice(b);
463            }
464        }
465    }
466}
467
468fn decode_type(cur: &mut Cursor<'_>, depth: u32) -> Result<Type, DbError> {
469    if depth > MAX_TYPE_NESTING_DEPTH {
470        return Err(CatalogDecodeError::TypeNestingTooDeep {
471            max: MAX_TYPE_NESTING_DEPTH,
472        }
473        .into());
474    }
475    let tag = cur.take_u8()?;
476    Ok(match tag {
477        TAG_BOOL => Type::Bool,
478        TAG_INT64 => Type::Int64,
479        TAG_UINT64 => Type::Uint64,
480        TAG_FLOAT64 => Type::Float64,
481        TAG_STRING => Type::String,
482        TAG_BYTES => Type::Bytes,
483        TAG_UUID => Type::Uuid,
484        TAG_TIMESTAMP => Type::Timestamp,
485        TAG_OPTIONAL => Type::Optional(Box::new(decode_type(cur, depth + 1)?)),
486        TAG_LIST => Type::List(Box::new(decode_type(cur, depth + 1)?)),
487        TAG_OBJECT => {
488            let n = cur.take_u32()? as usize;
489            let mut fields = Vec::with_capacity(n.min(1024));
490            for _ in 0..n {
491                let path = decode_field_path(cur)?;
492                let ty = decode_type(cur, depth + 1)?;
493                fields.push(FieldDef {
494                    path,
495                    ty,
496                    constraints: Vec::new(),
497                });
498            }
499            Type::Object(fields)
500        }
501        TAG_ENUM => {
502            let n = cur.take_u32()? as usize;
503            let mut variants = Vec::with_capacity(n.min(1024));
504            for _ in 0..n {
505                let len = cur.take_u32()? as usize;
506                let bytes = cur.take_bytes(len)?;
507                let s = String::from_utf8(bytes).map_err(|_| CatalogDecodeError::InvalidUtf8)?;
508                variants.push(s);
509            }
510            Type::Enum(variants)
511        }
512        _ => {
513            return Err(DbError::Format(FormatError::InvalidCatalogPayload {
514                message: format!("unknown type tag {tag}"),
515            }))
516        }
517    })
518}
519
520struct Cursor<'a> {
521    bytes: &'a [u8],
522    pos: usize,
523}
524
525impl<'a> Cursor<'a> {
526    fn new(bytes: &'a [u8]) -> Self {
527        Self { bytes, pos: 0 }
528    }
529
530    fn remaining(&self) -> usize {
531        self.bytes.len().saturating_sub(self.pos)
532    }
533
534    fn take_u8(&mut self) -> Result<u8, DbError> {
535        if self.pos >= self.bytes.len() {
536            return Err(CatalogDecodeError::UnexpectedEof.into());
537        }
538        let b = self.bytes[self.pos];
539        self.pos += 1;
540        Ok(b)
541    }
542
543    fn take_u16(&mut self) -> Result<u16, DbError> {
544        if self.remaining() < 2 {
545            return Err(CatalogDecodeError::UnexpectedEof.into());
546        }
547        let v = u16::from_le_bytes([self.bytes[self.pos], self.bytes[self.pos + 1]]);
548        self.pos += 2;
549        Ok(v)
550    }
551
552    fn take_u32(&mut self) -> Result<u32, DbError> {
553        if self.remaining() < 4 {
554            return Err(CatalogDecodeError::UnexpectedEof.into());
555        }
556        let v = u32::from_le_bytes([
557            self.bytes[self.pos],
558            self.bytes[self.pos + 1],
559            self.bytes[self.pos + 2],
560            self.bytes[self.pos + 3],
561        ]);
562        self.pos += 4;
563        Ok(v)
564    }
565
566    fn take_u64(&mut self) -> Result<u64, DbError> {
567        if self.remaining() < 8 {
568            return Err(CatalogDecodeError::UnexpectedEof.into());
569        }
570        let mut b = [0u8; 8];
571        b.copy_from_slice(&self.bytes[self.pos..self.pos + 8]);
572        self.pos += 8;
573        Ok(u64::from_le_bytes(b))
574    }
575
576    fn take_i64(&mut self) -> Result<i64, DbError> {
577        Ok(self.take_u64()? as i64)
578    }
579
580    fn take_bytes(&mut self, n: usize) -> Result<Vec<u8>, DbError> {
581        if self.remaining() < n {
582            return Err(CatalogDecodeError::UnexpectedEof.into());
583        }
584        let slice = &self.bytes[self.pos..self.pos + n];
585        self.pos += n;
586        Ok(slice.to_vec())
587    }
588}
589
590#[cfg(test)]
591mod tests {
592    include!(concat!(
593        env!("CARGO_MANIFEST_DIR"),
594        "/tests/unit/src_catalog_codec_tests.rs"
595    ));
596}