Skip to main content

modelvault_core/catalog/
codec.rs

1//! Binary encoding for catalog payloads embedded in `SegmentType::Schema` segments.
2
3use std::borrow::Cow;
4
5use crate::error::{DbError, FormatError};
6use crate::file_format::check_decode_entry_count;
7use crate::schema::{Constraint, FieldDef, FieldPath, IndexDef, IndexKind, Type};
8
9/// Maximum UTF-8 length for a collection name (exclusive upper bound is 1024 bytes).
10pub const MAX_COLLECTION_NAME_BYTES: usize = 1023;
11
12/// Legacy catalog payload (no primary key on create).
13pub const CATALOG_PAYLOAD_VERSION_V1: u16 = 1;
14/// Catalog with optional `primary_field` on create, no per-field constraints.
15pub const CATALOG_PAYLOAD_VERSION_V2: u16 = 2;
16/// Current catalog write version: `primary_field` + [`FieldDef::constraints`].
17pub const CATALOG_PAYLOAD_VERSION_V3: u16 = 3;
18/// Catalog with `indexes` definitions (secondary indexes).
19pub const CATALOG_PAYLOAD_VERSION_V4: u16 = 4;
20/// What [`encode_catalog_payload`] writes (latest).
21pub const CATALOG_PAYLOAD_VERSION: u16 = CATALOG_PAYLOAD_VERSION_V4;
22
23pub const ENTRY_KIND_CREATE_COLLECTION: u16 = 1;
24pub const ENTRY_KIND_NEW_SCHEMA_VERSION: u16 = 2;
25
26/// Maximum nesting depth for `Type` when encoding/decoding (prevents stack overflow on hostile input).
27pub const MAX_TYPE_NESTING_DEPTH: u32 = 32;
28
29/// Maximum field definitions per schema version in a catalog payload.
30pub const MAX_FIELDS_PER_SCHEMA: u32 = 4096;
31
32#[derive(Debug, Clone, PartialEq)]
33pub enum CatalogDecodeError {
34    UnexpectedEof,
35    UnknownCatalogPayloadVersion { got: u16 },
36    UnknownEntryKind { got: u16 },
37    TrailingBytes,
38    TypeNestingTooDeep { max: u32 },
39    InvalidUtf8,
40    CollectionNameTooLong { got: usize },
41    EmptyCollectionName,
42    InvalidCreateSchemaVersion { got: u32 },
43    IndexNameTooLong { got: usize },
44    EmptyIndexName,
45    UnknownIndexKind { got: u8 },
46}
47
48impl From<CatalogDecodeError> for DbError {
49    fn from(e: CatalogDecodeError) -> Self {
50        DbError::Format(FormatError::InvalidCatalogPayload {
51            message: format!("{e:?}"),
52        })
53    }
54}
55
56/// Encode a catalog record as segment payload bytes (current [`CATALOG_PAYLOAD_VERSION`]).
57pub fn encode_catalog_payload(record: &CatalogRecordWire) -> Vec<u8> {
58    let mut out = Vec::new();
59    out.extend_from_slice(&CATALOG_PAYLOAD_VERSION.to_le_bytes());
60    match record {
61        CatalogRecordWire::CreateCollection {
62            collection_id,
63            name,
64            schema_version,
65            fields,
66            indexes,
67            primary_field,
68        } => {
69            out.extend_from_slice(&ENTRY_KIND_CREATE_COLLECTION.to_le_bytes());
70            out.extend_from_slice(&collection_id.to_le_bytes());
71            encode_name(&mut out, name);
72            out.extend_from_slice(&schema_version.to_le_bytes());
73            encode_fields_v3(&mut out, fields);
74            encode_indexes(&mut out, indexes);
75            encode_optional_primary_name(&mut out, primary_field.as_deref());
76        }
77        CatalogRecordWire::NewSchemaVersion {
78            collection_id,
79            schema_version,
80            fields,
81            indexes,
82        } => {
83            out.extend_from_slice(&ENTRY_KIND_NEW_SCHEMA_VERSION.to_le_bytes());
84            out.extend_from_slice(&collection_id.to_le_bytes());
85            out.extend_from_slice(&schema_version.to_le_bytes());
86            encode_fields_v3(&mut out, fields);
87            encode_indexes(&mut out, indexes);
88        }
89    }
90    out
91}
92
93/// Wire representation for encoding (mirrors on-disk entry kinds).
94#[derive(Debug, Clone, PartialEq)]
95pub enum CatalogRecordWire {
96    CreateCollection {
97        collection_id: u32,
98        name: String,
99        schema_version: u32,
100        fields: Vec<FieldDef>,
101        indexes: Vec<IndexDef>,
102        /// Top-level segment name for the primary key (`None` for legacy v1 catalog segments).
103        primary_field: Option<String>,
104    },
105    NewSchemaVersion {
106        collection_id: u32,
107        schema_version: u32,
108        fields: Vec<FieldDef>,
109        indexes: Vec<IndexDef>,
110    },
111}
112
113pub fn decode_catalog_payload(bytes: &[u8]) -> Result<CatalogRecordWire, DbError> {
114    let mut cur = Cursor::new(bytes);
115    let ver = cur.take_u16()?;
116    if ver != CATALOG_PAYLOAD_VERSION_V1
117        && ver != CATALOG_PAYLOAD_VERSION_V2
118        && ver != CATALOG_PAYLOAD_VERSION_V3
119        && ver != CATALOG_PAYLOAD_VERSION_V4
120    {
121        return Err(CatalogDecodeError::UnknownCatalogPayloadVersion { got: ver }.into());
122    }
123    let kind = cur.take_u16()?;
124    match kind {
125        ENTRY_KIND_CREATE_COLLECTION => {
126            let collection_id = cur.take_u32()?;
127            let name = decode_name(&mut cur)?;
128            let schema_version = cur.take_u32()?;
129            let fields = decode_fields(&mut cur, ver)?;
130            let indexes = if ver >= CATALOG_PAYLOAD_VERSION_V4 {
131                decode_indexes(&mut cur)?
132            } else {
133                Vec::new()
134            };
135            let primary_field = if ver >= CATALOG_PAYLOAD_VERSION_V2 {
136                decode_optional_primary_name(&mut cur)?
137            } else {
138                None
139            };
140            if cur.remaining() != 0 {
141                return Err(CatalogDecodeError::TrailingBytes.into());
142            }
143            Ok(CatalogRecordWire::CreateCollection {
144                collection_id,
145                name,
146                schema_version,
147                fields,
148                indexes,
149                primary_field,
150            })
151        }
152        ENTRY_KIND_NEW_SCHEMA_VERSION => {
153            let collection_id = cur.take_u32()?;
154            let schema_version = cur.take_u32()?;
155            let fields = decode_fields(&mut cur, ver)?;
156            let indexes = if ver >= CATALOG_PAYLOAD_VERSION_V4 {
157                decode_indexes(&mut cur)?
158            } else {
159                Vec::new()
160            };
161            if cur.remaining() != 0 {
162                return Err(CatalogDecodeError::TrailingBytes.into());
163            }
164            Ok(CatalogRecordWire::NewSchemaVersion {
165                collection_id,
166                schema_version,
167                fields,
168                indexes,
169            })
170        }
171        _ => Err(CatalogDecodeError::UnknownEntryKind { got: kind }.into()),
172    }
173}
174
175fn encode_optional_primary_name(out: &mut Vec<u8>, primary: Option<&str>) {
176    match primary {
177        None => out.extend_from_slice(&0u32.to_le_bytes()),
178        Some(s) => {
179            let b = s.as_bytes();
180            out.extend_from_slice(&(b.len() as u32).to_le_bytes());
181            out.extend_from_slice(b);
182        }
183    }
184}
185
186fn decode_optional_primary_name(cur: &mut Cursor<'_>) -> Result<Option<String>, DbError> {
187    let n = cur.take_u32()? as usize;
188    if n == 0 {
189        return Ok(None);
190    }
191    if n > MAX_COLLECTION_NAME_BYTES {
192        return Err(CatalogDecodeError::CollectionNameTooLong { got: n }.into());
193    }
194    let bytes = cur.take_bytes(n)?;
195    let s = String::from_utf8(bytes).map_err(|_| CatalogDecodeError::InvalidUtf8)?;
196    Ok(Some(s))
197}
198
199fn encode_name(out: &mut Vec<u8>, name: &str) {
200    let b = name.as_bytes();
201    out.extend_from_slice(&(b.len() as u32).to_le_bytes());
202    out.extend_from_slice(b);
203}
204
205fn decode_name(cur: &mut Cursor<'_>) -> Result<String, DbError> {
206    let n = cur.take_u32()? as usize;
207    if n == 0 {
208        return Err(CatalogDecodeError::EmptyCollectionName.into());
209    }
210    if n > MAX_COLLECTION_NAME_BYTES {
211        return Err(CatalogDecodeError::CollectionNameTooLong { got: n }.into());
212    }
213    let bytes = cur.take_bytes(n)?;
214    String::from_utf8(bytes).map_err(|_| CatalogDecodeError::InvalidUtf8.into())
215}
216
217fn encode_indexes(out: &mut Vec<u8>, indexes: &[IndexDef]) {
218    out.extend_from_slice(&(indexes.len() as u32).to_le_bytes());
219    for idx in indexes {
220        match idx.kind {
221            IndexKind::Unique => out.push(1),
222            IndexKind::NonUnique => out.push(2),
223        }
224        encode_field_path(out, &idx.path);
225        let b = idx.name.as_bytes();
226        out.extend_from_slice(&(b.len() as u32).to_le_bytes());
227        out.extend_from_slice(b);
228    }
229}
230
231fn decode_indexes(cur: &mut Cursor<'_>) -> Result<Vec<IndexDef>, DbError> {
232    let n = cur.take_u32()? as usize;
233    check_decode_entry_count(n)?;
234    let mut v = Vec::with_capacity(n.min(1024));
235    for _ in 0..n {
236        let kind_tag = cur.take_u8()?;
237        let kind = match kind_tag {
238            1 => IndexKind::Unique,
239            2 => IndexKind::NonUnique,
240            _ => return Err(CatalogDecodeError::UnknownIndexKind { got: kind_tag }.into()),
241        };
242        let path = decode_field_path(cur)?;
243        let name_len = cur.take_u32()? as usize;
244        if name_len == 0 {
245            return Err(CatalogDecodeError::EmptyIndexName.into());
246        }
247        if name_len > MAX_COLLECTION_NAME_BYTES {
248            return Err(CatalogDecodeError::IndexNameTooLong { got: name_len }.into());
249        }
250        let bytes = cur.take_bytes(name_len)?;
251        let name = String::from_utf8(bytes).map_err(|_| CatalogDecodeError::InvalidUtf8)?;
252        v.push(IndexDef { name, path, kind });
253    }
254    Ok(v)
255}
256
257fn encode_fields_v3(out: &mut Vec<u8>, fields: &[FieldDef]) {
258    out.extend_from_slice(&(fields.len() as u32).to_le_bytes());
259    for f in fields {
260        encode_field_path(out, &f.path);
261        encode_type(out, &f.ty, 0);
262        encode_constraints(out, &f.constraints);
263    }
264}
265
266fn decode_fields(cur: &mut Cursor<'_>, catalog_ver: u16) -> Result<Vec<FieldDef>, DbError> {
267    let n = cur.take_u32()?;
268    if n > MAX_FIELDS_PER_SCHEMA {
269        return Err(DbError::Format(FormatError::InvalidCatalogPayload {
270            message: format!("field count {n} exceeds maximum {MAX_FIELDS_PER_SCHEMA}"),
271        }));
272    }
273    let n = n as usize;
274    let mut v = Vec::with_capacity(n.min(1024));
275    for _ in 0..n {
276        let path = decode_field_path(cur)?;
277        let ty = decode_type(cur, 0)?;
278        let constraints = if catalog_ver >= CATALOG_PAYLOAD_VERSION_V3 {
279            decode_constraints(cur)?
280        } else {
281            Vec::new()
282        };
283        v.push(FieldDef {
284            path,
285            ty,
286            constraints,
287        });
288    }
289    Ok(v)
290}
291
292const CT_MIN_I64: u8 = 1;
293const CT_MAX_I64: u8 = 2;
294const CT_MIN_U64: u8 = 3;
295const CT_MAX_U64: u8 = 4;
296const CT_MIN_F64: u8 = 5;
297const CT_MAX_F64: u8 = 6;
298const CT_MIN_LEN: u8 = 7;
299const CT_MAX_LEN: u8 = 8;
300const CT_REGEX: u8 = 9;
301const CT_EMAIL: u8 = 10;
302const CT_URL: u8 = 11;
303const CT_NONEMPTY: u8 = 12;
304
305fn encode_constraints(out: &mut Vec<u8>, c: &[Constraint]) {
306    out.extend_from_slice(&(c.len() as u32).to_le_bytes());
307    for x in c {
308        match x {
309            Constraint::MinI64(n) => {
310                out.push(CT_MIN_I64);
311                out.extend_from_slice(&n.to_le_bytes());
312            }
313            Constraint::MaxI64(n) => {
314                out.push(CT_MAX_I64);
315                out.extend_from_slice(&n.to_le_bytes());
316            }
317            Constraint::MinU64(n) => {
318                out.push(CT_MIN_U64);
319                out.extend_from_slice(&n.to_le_bytes());
320            }
321            Constraint::MaxU64(n) => {
322                out.push(CT_MAX_U64);
323                out.extend_from_slice(&n.to_le_bytes());
324            }
325            Constraint::MinF64(n) => {
326                out.push(CT_MIN_F64);
327                out.extend_from_slice(&n.to_le_bytes());
328            }
329            Constraint::MaxF64(n) => {
330                out.push(CT_MAX_F64);
331                out.extend_from_slice(&n.to_le_bytes());
332            }
333            Constraint::MinLength(n) => {
334                out.push(CT_MIN_LEN);
335                out.extend_from_slice(&n.to_le_bytes());
336            }
337            Constraint::MaxLength(n) => {
338                out.push(CT_MAX_LEN);
339                out.extend_from_slice(&n.to_le_bytes());
340            }
341            Constraint::Regex(s) => {
342                out.push(CT_REGEX);
343                let b = s.as_bytes();
344                out.extend_from_slice(&(b.len() as u32).to_le_bytes());
345                out.extend_from_slice(b);
346            }
347            Constraint::Email => out.push(CT_EMAIL),
348            Constraint::Url => out.push(CT_URL),
349            Constraint::NonEmpty => out.push(CT_NONEMPTY),
350        }
351    }
352}
353
354fn decode_constraints(cur: &mut Cursor<'_>) -> Result<Vec<Constraint>, DbError> {
355    let n = cur.take_u32()? as usize;
356    check_decode_entry_count(n)?;
357    let mut v = Vec::with_capacity(n.min(4096));
358    for _ in 0..n {
359        let tag = cur.take_u8()?;
360        let c = match tag {
361            CT_MIN_I64 => Constraint::MinI64(cur.take_i64()?),
362            CT_MAX_I64 => Constraint::MaxI64(cur.take_i64()?),
363            CT_MIN_U64 => Constraint::MinU64(cur.take_u64()?),
364            CT_MAX_U64 => Constraint::MaxU64(cur.take_u64()?),
365            CT_MIN_F64 => Constraint::MinF64(f64::from_bits(cur.take_u64()?)),
366            CT_MAX_F64 => Constraint::MaxF64(f64::from_bits(cur.take_u64()?)),
367            CT_MIN_LEN => Constraint::MinLength(cur.take_u64()?),
368            CT_MAX_LEN => Constraint::MaxLength(cur.take_u64()?),
369            CT_REGEX => {
370                let len = cur.take_u32()? as usize;
371                let bytes = cur.take_bytes(len)?;
372                Constraint::Regex(
373                    String::from_utf8(bytes).map_err(|_| CatalogDecodeError::InvalidUtf8)?,
374                )
375            }
376            CT_EMAIL => Constraint::Email,
377            CT_URL => Constraint::Url,
378            CT_NONEMPTY => Constraint::NonEmpty,
379            _ => {
380                return Err(DbError::Format(FormatError::InvalidCatalogPayload {
381                    message: format!("unknown constraint tag {tag}"),
382                }))
383            }
384        };
385        v.push(c);
386    }
387    Ok(v)
388}
389
390fn encode_field_path(out: &mut Vec<u8>, path: &FieldPath) {
391    let parts = &path.0;
392    out.extend_from_slice(&(parts.len() as u32).to_le_bytes());
393    for p in parts {
394        let b = p.as_bytes();
395        out.extend_from_slice(&(b.len() as u32).to_le_bytes());
396        out.extend_from_slice(b);
397    }
398}
399
400fn decode_field_path(cur: &mut Cursor<'_>) -> Result<FieldPath, DbError> {
401    let n = cur.take_u32()? as usize;
402    if n == 0 {
403        return Err(DbError::Schema(crate::error::SchemaError::InvalidFieldPath));
404    }
405    let mut parts = Vec::with_capacity(n.min(64));
406    for _ in 0..n {
407        let len = cur.take_u32()? as usize;
408        let bytes = cur.take_bytes(len)?;
409        let s = String::from_utf8(bytes).map_err(|_| CatalogDecodeError::InvalidUtf8)?;
410        if s.is_empty() {
411            return Err(DbError::Schema(crate::error::SchemaError::InvalidFieldPath));
412        }
413        parts.push(Cow::Owned(s));
414    }
415    Ok(FieldPath(parts))
416}
417
418const TAG_BOOL: u8 = 0;
419const TAG_INT64: u8 = 1;
420const TAG_UINT64: u8 = 2;
421const TAG_FLOAT64: u8 = 3;
422const TAG_STRING: u8 = 4;
423const TAG_BYTES: u8 = 5;
424const TAG_UUID: u8 = 6;
425const TAG_TIMESTAMP: u8 = 7;
426const TAG_OPTIONAL: u8 = 8;
427const TAG_LIST: u8 = 9;
428const TAG_OBJECT: u8 = 10;
429const TAG_ENUM: u8 = 11;
430
431// `depth` is only read when recursing into nested types; clippy does not see cross-call use.
432#[allow(clippy::only_used_in_recursion)]
433fn encode_type(out: &mut Vec<u8>, ty: &Type, depth: u32) {
434    match ty {
435        Type::Bool => out.push(TAG_BOOL),
436        Type::Int64 => out.push(TAG_INT64),
437        Type::Uint64 => out.push(TAG_UINT64),
438        Type::Float64 => out.push(TAG_FLOAT64),
439        Type::String => out.push(TAG_STRING),
440        Type::Bytes => out.push(TAG_BYTES),
441        Type::Uuid => out.push(TAG_UUID),
442        Type::Timestamp => out.push(TAG_TIMESTAMP),
443        Type::Optional(inner) => {
444            out.push(TAG_OPTIONAL);
445            encode_type(out, inner, depth + 1);
446        }
447        Type::List(inner) => {
448            out.push(TAG_LIST);
449            encode_type(out, inner, depth + 1);
450        }
451        Type::Object(fields) => {
452            out.push(TAG_OBJECT);
453            out.extend_from_slice(&(fields.len() as u32).to_le_bytes());
454            for f in fields {
455                encode_field_path(out, &f.path);
456                encode_type(out, &f.ty, depth + 1);
457            }
458        }
459        Type::Enum(variants) => {
460            out.push(TAG_ENUM);
461            out.extend_from_slice(&(variants.len() as u32).to_le_bytes());
462            for s in variants {
463                let b = s.as_bytes();
464                out.extend_from_slice(&(b.len() as u32).to_le_bytes());
465                out.extend_from_slice(b);
466            }
467        }
468    }
469}
470
471fn decode_type(cur: &mut Cursor<'_>, depth: u32) -> Result<Type, DbError> {
472    if depth > MAX_TYPE_NESTING_DEPTH {
473        return Err(CatalogDecodeError::TypeNestingTooDeep {
474            max: MAX_TYPE_NESTING_DEPTH,
475        }
476        .into());
477    }
478    let tag = cur.take_u8()?;
479    Ok(match tag {
480        TAG_BOOL => Type::Bool,
481        TAG_INT64 => Type::Int64,
482        TAG_UINT64 => Type::Uint64,
483        TAG_FLOAT64 => Type::Float64,
484        TAG_STRING => Type::String,
485        TAG_BYTES => Type::Bytes,
486        TAG_UUID => Type::Uuid,
487        TAG_TIMESTAMP => Type::Timestamp,
488        TAG_OPTIONAL => Type::Optional(Box::new(decode_type(cur, depth + 1)?)),
489        TAG_LIST => Type::List(Box::new(decode_type(cur, depth + 1)?)),
490        TAG_OBJECT => {
491            let n = cur.take_u32()? as usize;
492            let mut fields = Vec::with_capacity(n.min(1024));
493            for _ in 0..n {
494                let path = decode_field_path(cur)?;
495                let ty = decode_type(cur, depth + 1)?;
496                fields.push(FieldDef {
497                    path,
498                    ty,
499                    constraints: Vec::new(),
500                });
501            }
502            Type::Object(fields)
503        }
504        TAG_ENUM => {
505            let n = cur.take_u32()? as usize;
506            let mut variants = Vec::with_capacity(n.min(1024));
507            for _ in 0..n {
508                let len = cur.take_u32()? as usize;
509                let bytes = cur.take_bytes(len)?;
510                let s = String::from_utf8(bytes).map_err(|_| CatalogDecodeError::InvalidUtf8)?;
511                variants.push(s);
512            }
513            Type::Enum(variants)
514        }
515        _ => {
516            return Err(DbError::Format(FormatError::InvalidCatalogPayload {
517                message: format!("unknown type tag {tag}"),
518            }))
519        }
520    })
521}
522
523struct Cursor<'a> {
524    bytes: &'a [u8],
525    pos: usize,
526}
527
528impl<'a> Cursor<'a> {
529    fn new(bytes: &'a [u8]) -> Self {
530        Self { bytes, pos: 0 }
531    }
532
533    fn remaining(&self) -> usize {
534        self.bytes.len().saturating_sub(self.pos)
535    }
536
537    fn take_u8(&mut self) -> Result<u8, DbError> {
538        if self.pos >= self.bytes.len() {
539            return Err(CatalogDecodeError::UnexpectedEof.into());
540        }
541        let b = self.bytes[self.pos];
542        self.pos += 1;
543        Ok(b)
544    }
545
546    fn take_u16(&mut self) -> Result<u16, DbError> {
547        if self.remaining() < 2 {
548            return Err(CatalogDecodeError::UnexpectedEof.into());
549        }
550        let v = u16::from_le_bytes([self.bytes[self.pos], self.bytes[self.pos + 1]]);
551        self.pos += 2;
552        Ok(v)
553    }
554
555    fn take_u32(&mut self) -> Result<u32, DbError> {
556        if self.remaining() < 4 {
557            return Err(CatalogDecodeError::UnexpectedEof.into());
558        }
559        let v = u32::from_le_bytes([
560            self.bytes[self.pos],
561            self.bytes[self.pos + 1],
562            self.bytes[self.pos + 2],
563            self.bytes[self.pos + 3],
564        ]);
565        self.pos += 4;
566        Ok(v)
567    }
568
569    fn take_u64(&mut self) -> Result<u64, DbError> {
570        if self.remaining() < 8 {
571            return Err(CatalogDecodeError::UnexpectedEof.into());
572        }
573        let mut b = [0u8; 8];
574        b.copy_from_slice(&self.bytes[self.pos..self.pos + 8]);
575        self.pos += 8;
576        Ok(u64::from_le_bytes(b))
577    }
578
579    fn take_i64(&mut self) -> Result<i64, DbError> {
580        Ok(self.take_u64()? as i64)
581    }
582
583    fn take_bytes(&mut self, n: usize) -> Result<Vec<u8>, DbError> {
584        if self.remaining() < n {
585            return Err(CatalogDecodeError::UnexpectedEof.into());
586        }
587        let slice = &self.bytes[self.pos..self.pos + n];
588        self.pos += n;
589        Ok(slice.to_vec())
590    }
591}
592
593#[cfg(test)]
594mod tests {
595    include!(concat!(
596        env!("CARGO_MANIFEST_DIR"),
597        "/tests/unit/src_catalog_codec_tests.rs"
598    ));
599}