1use std::borrow::Cow;
4
5use crate::error::{DbError, FormatError};
6use crate::file_format::check_decode_entry_count;
7use crate::schema::{Constraint, FieldDef, FieldPath, IndexDef, IndexKind, Type};
8
9pub const MAX_COLLECTION_NAME_BYTES: usize = 1023;
11
12pub const CATALOG_PAYLOAD_VERSION_V1: u16 = 1;
14pub const CATALOG_PAYLOAD_VERSION_V2: u16 = 2;
16pub const CATALOG_PAYLOAD_VERSION_V3: u16 = 3;
18pub const CATALOG_PAYLOAD_VERSION_V4: u16 = 4;
20pub const CATALOG_PAYLOAD_VERSION: u16 = CATALOG_PAYLOAD_VERSION_V4;
22
23pub const ENTRY_KIND_CREATE_COLLECTION: u16 = 1;
24pub const ENTRY_KIND_NEW_SCHEMA_VERSION: u16 = 2;
25
26pub const MAX_TYPE_NESTING_DEPTH: u32 = 32;
28
29pub const MAX_FIELDS_PER_SCHEMA: u32 = 4096;
31
32#[derive(Debug, Clone, PartialEq)]
33pub enum CatalogDecodeError {
34 UnexpectedEof,
35 UnknownCatalogPayloadVersion { got: u16 },
36 UnknownEntryKind { got: u16 },
37 TrailingBytes,
38 TypeNestingTooDeep { max: u32 },
39 InvalidUtf8,
40 CollectionNameTooLong { got: usize },
41 EmptyCollectionName,
42 InvalidCreateSchemaVersion { got: u32 },
43 IndexNameTooLong { got: usize },
44 EmptyIndexName,
45 UnknownIndexKind { got: u8 },
46}
47
48impl From<CatalogDecodeError> for DbError {
49 fn from(e: CatalogDecodeError) -> Self {
50 DbError::Format(FormatError::InvalidCatalogPayload {
51 message: format!("{e:?}"),
52 })
53 }
54}
55
56pub fn encode_catalog_payload(record: &CatalogRecordWire) -> Vec<u8> {
58 let mut out = Vec::new();
59 out.extend_from_slice(&CATALOG_PAYLOAD_VERSION.to_le_bytes());
60 match record {
61 CatalogRecordWire::CreateCollection {
62 collection_id,
63 name,
64 schema_version,
65 fields,
66 indexes,
67 primary_field,
68 } => {
69 out.extend_from_slice(&ENTRY_KIND_CREATE_COLLECTION.to_le_bytes());
70 out.extend_from_slice(&collection_id.to_le_bytes());
71 encode_name(&mut out, name);
72 out.extend_from_slice(&schema_version.to_le_bytes());
73 encode_fields_v3(&mut out, fields);
74 encode_indexes(&mut out, indexes);
75 encode_optional_primary_name(&mut out, primary_field.as_deref());
76 }
77 CatalogRecordWire::NewSchemaVersion {
78 collection_id,
79 schema_version,
80 fields,
81 indexes,
82 } => {
83 out.extend_from_slice(&ENTRY_KIND_NEW_SCHEMA_VERSION.to_le_bytes());
84 out.extend_from_slice(&collection_id.to_le_bytes());
85 out.extend_from_slice(&schema_version.to_le_bytes());
86 encode_fields_v3(&mut out, fields);
87 encode_indexes(&mut out, indexes);
88 }
89 }
90 out
91}
92
93#[derive(Debug, Clone, PartialEq)]
95pub enum CatalogRecordWire {
96 CreateCollection {
97 collection_id: u32,
98 name: String,
99 schema_version: u32,
100 fields: Vec<FieldDef>,
101 indexes: Vec<IndexDef>,
102 primary_field: Option<String>,
104 },
105 NewSchemaVersion {
106 collection_id: u32,
107 schema_version: u32,
108 fields: Vec<FieldDef>,
109 indexes: Vec<IndexDef>,
110 },
111}
112
113pub fn decode_catalog_payload(bytes: &[u8]) -> Result<CatalogRecordWire, DbError> {
114 let mut cur = Cursor::new(bytes);
115 let ver = cur.take_u16()?;
116 if ver != CATALOG_PAYLOAD_VERSION_V1
117 && ver != CATALOG_PAYLOAD_VERSION_V2
118 && ver != CATALOG_PAYLOAD_VERSION_V3
119 && ver != CATALOG_PAYLOAD_VERSION_V4
120 {
121 return Err(CatalogDecodeError::UnknownCatalogPayloadVersion { got: ver }.into());
122 }
123 let kind = cur.take_u16()?;
124 match kind {
125 ENTRY_KIND_CREATE_COLLECTION => {
126 let collection_id = cur.take_u32()?;
127 let name = decode_name(&mut cur)?;
128 let schema_version = cur.take_u32()?;
129 let fields = decode_fields(&mut cur, ver)?;
130 let indexes = if ver >= CATALOG_PAYLOAD_VERSION_V4 {
131 decode_indexes(&mut cur)?
132 } else {
133 Vec::new()
134 };
135 let primary_field = if ver >= CATALOG_PAYLOAD_VERSION_V2 {
136 decode_optional_primary_name(&mut cur)?
137 } else {
138 None
139 };
140 if cur.remaining() != 0 {
141 return Err(CatalogDecodeError::TrailingBytes.into());
142 }
143 Ok(CatalogRecordWire::CreateCollection {
144 collection_id,
145 name,
146 schema_version,
147 fields,
148 indexes,
149 primary_field,
150 })
151 }
152 ENTRY_KIND_NEW_SCHEMA_VERSION => {
153 let collection_id = cur.take_u32()?;
154 let schema_version = cur.take_u32()?;
155 let fields = decode_fields(&mut cur, ver)?;
156 let indexes = if ver >= CATALOG_PAYLOAD_VERSION_V4 {
157 decode_indexes(&mut cur)?
158 } else {
159 Vec::new()
160 };
161 if cur.remaining() != 0 {
162 return Err(CatalogDecodeError::TrailingBytes.into());
163 }
164 Ok(CatalogRecordWire::NewSchemaVersion {
165 collection_id,
166 schema_version,
167 fields,
168 indexes,
169 })
170 }
171 _ => Err(CatalogDecodeError::UnknownEntryKind { got: kind }.into()),
172 }
173}
174
175fn encode_optional_primary_name(out: &mut Vec<u8>, primary: Option<&str>) {
176 match primary {
177 None => out.extend_from_slice(&0u32.to_le_bytes()),
178 Some(s) => {
179 let b = s.as_bytes();
180 out.extend_from_slice(&(b.len() as u32).to_le_bytes());
181 out.extend_from_slice(b);
182 }
183 }
184}
185
186fn decode_optional_primary_name(cur: &mut Cursor<'_>) -> Result<Option<String>, DbError> {
187 let n = cur.take_u32()? as usize;
188 if n == 0 {
189 return Ok(None);
190 }
191 if n > MAX_COLLECTION_NAME_BYTES {
192 return Err(CatalogDecodeError::CollectionNameTooLong { got: n }.into());
193 }
194 let bytes = cur.take_bytes(n)?;
195 let s = String::from_utf8(bytes).map_err(|_| CatalogDecodeError::InvalidUtf8)?;
196 Ok(Some(s))
197}
198
199fn encode_name(out: &mut Vec<u8>, name: &str) {
200 let b = name.as_bytes();
201 out.extend_from_slice(&(b.len() as u32).to_le_bytes());
202 out.extend_from_slice(b);
203}
204
205fn decode_name(cur: &mut Cursor<'_>) -> Result<String, DbError> {
206 let n = cur.take_u32()? as usize;
207 if n == 0 {
208 return Err(CatalogDecodeError::EmptyCollectionName.into());
209 }
210 if n > MAX_COLLECTION_NAME_BYTES {
211 return Err(CatalogDecodeError::CollectionNameTooLong { got: n }.into());
212 }
213 let bytes = cur.take_bytes(n)?;
214 String::from_utf8(bytes).map_err(|_| CatalogDecodeError::InvalidUtf8.into())
215}
216
217fn encode_indexes(out: &mut Vec<u8>, indexes: &[IndexDef]) {
218 out.extend_from_slice(&(indexes.len() as u32).to_le_bytes());
219 for idx in indexes {
220 match idx.kind {
221 IndexKind::Unique => out.push(1),
222 IndexKind::NonUnique => out.push(2),
223 }
224 encode_field_path(out, &idx.path);
225 let b = idx.name.as_bytes();
226 out.extend_from_slice(&(b.len() as u32).to_le_bytes());
227 out.extend_from_slice(b);
228 }
229}
230
231fn decode_indexes(cur: &mut Cursor<'_>) -> Result<Vec<IndexDef>, DbError> {
232 let n = cur.take_u32()? as usize;
233 check_decode_entry_count(n)?;
234 let mut v = Vec::with_capacity(n.min(1024));
235 for _ in 0..n {
236 let kind_tag = cur.take_u8()?;
237 let kind = match kind_tag {
238 1 => IndexKind::Unique,
239 2 => IndexKind::NonUnique,
240 _ => return Err(CatalogDecodeError::UnknownIndexKind { got: kind_tag }.into()),
241 };
242 let path = decode_field_path(cur)?;
243 let name_len = cur.take_u32()? as usize;
244 if name_len == 0 {
245 return Err(CatalogDecodeError::EmptyIndexName.into());
246 }
247 if name_len > MAX_COLLECTION_NAME_BYTES {
248 return Err(CatalogDecodeError::IndexNameTooLong { got: name_len }.into());
249 }
250 let bytes = cur.take_bytes(name_len)?;
251 let name = String::from_utf8(bytes).map_err(|_| CatalogDecodeError::InvalidUtf8)?;
252 v.push(IndexDef { name, path, kind });
253 }
254 Ok(v)
255}
256
257fn encode_fields_v3(out: &mut Vec<u8>, fields: &[FieldDef]) {
258 out.extend_from_slice(&(fields.len() as u32).to_le_bytes());
259 for f in fields {
260 encode_field_path(out, &f.path);
261 encode_type(out, &f.ty, 0);
262 encode_constraints(out, &f.constraints);
263 }
264}
265
266fn decode_fields(cur: &mut Cursor<'_>, catalog_ver: u16) -> Result<Vec<FieldDef>, DbError> {
267 let n = cur.take_u32()?;
268 if n > MAX_FIELDS_PER_SCHEMA {
269 return Err(DbError::Format(FormatError::InvalidCatalogPayload {
270 message: format!("field count {n} exceeds maximum {MAX_FIELDS_PER_SCHEMA}"),
271 }));
272 }
273 let n = n as usize;
274 let mut v = Vec::with_capacity(n.min(1024));
275 for _ in 0..n {
276 let path = decode_field_path(cur)?;
277 let ty = decode_type(cur, 0)?;
278 let constraints = if catalog_ver >= CATALOG_PAYLOAD_VERSION_V3 {
279 decode_constraints(cur)?
280 } else {
281 Vec::new()
282 };
283 v.push(FieldDef {
284 path,
285 ty,
286 constraints,
287 });
288 }
289 Ok(v)
290}
291
292const CT_MIN_I64: u8 = 1;
293const CT_MAX_I64: u8 = 2;
294const CT_MIN_U64: u8 = 3;
295const CT_MAX_U64: u8 = 4;
296const CT_MIN_F64: u8 = 5;
297const CT_MAX_F64: u8 = 6;
298const CT_MIN_LEN: u8 = 7;
299const CT_MAX_LEN: u8 = 8;
300const CT_REGEX: u8 = 9;
301const CT_EMAIL: u8 = 10;
302const CT_URL: u8 = 11;
303const CT_NONEMPTY: u8 = 12;
304
305fn encode_constraints(out: &mut Vec<u8>, c: &[Constraint]) {
306 out.extend_from_slice(&(c.len() as u32).to_le_bytes());
307 for x in c {
308 match x {
309 Constraint::MinI64(n) => {
310 out.push(CT_MIN_I64);
311 out.extend_from_slice(&n.to_le_bytes());
312 }
313 Constraint::MaxI64(n) => {
314 out.push(CT_MAX_I64);
315 out.extend_from_slice(&n.to_le_bytes());
316 }
317 Constraint::MinU64(n) => {
318 out.push(CT_MIN_U64);
319 out.extend_from_slice(&n.to_le_bytes());
320 }
321 Constraint::MaxU64(n) => {
322 out.push(CT_MAX_U64);
323 out.extend_from_slice(&n.to_le_bytes());
324 }
325 Constraint::MinF64(n) => {
326 out.push(CT_MIN_F64);
327 out.extend_from_slice(&n.to_le_bytes());
328 }
329 Constraint::MaxF64(n) => {
330 out.push(CT_MAX_F64);
331 out.extend_from_slice(&n.to_le_bytes());
332 }
333 Constraint::MinLength(n) => {
334 out.push(CT_MIN_LEN);
335 out.extend_from_slice(&n.to_le_bytes());
336 }
337 Constraint::MaxLength(n) => {
338 out.push(CT_MAX_LEN);
339 out.extend_from_slice(&n.to_le_bytes());
340 }
341 Constraint::Regex(s) => {
342 out.push(CT_REGEX);
343 let b = s.as_bytes();
344 out.extend_from_slice(&(b.len() as u32).to_le_bytes());
345 out.extend_from_slice(b);
346 }
347 Constraint::Email => out.push(CT_EMAIL),
348 Constraint::Url => out.push(CT_URL),
349 Constraint::NonEmpty => out.push(CT_NONEMPTY),
350 }
351 }
352}
353
354fn decode_constraints(cur: &mut Cursor<'_>) -> Result<Vec<Constraint>, DbError> {
355 let n = cur.take_u32()? as usize;
356 check_decode_entry_count(n)?;
357 let mut v = Vec::with_capacity(n.min(4096));
358 for _ in 0..n {
359 let tag = cur.take_u8()?;
360 let c = match tag {
361 CT_MIN_I64 => Constraint::MinI64(cur.take_i64()?),
362 CT_MAX_I64 => Constraint::MaxI64(cur.take_i64()?),
363 CT_MIN_U64 => Constraint::MinU64(cur.take_u64()?),
364 CT_MAX_U64 => Constraint::MaxU64(cur.take_u64()?),
365 CT_MIN_F64 => Constraint::MinF64(f64::from_bits(cur.take_u64()?)),
366 CT_MAX_F64 => Constraint::MaxF64(f64::from_bits(cur.take_u64()?)),
367 CT_MIN_LEN => Constraint::MinLength(cur.take_u64()?),
368 CT_MAX_LEN => Constraint::MaxLength(cur.take_u64()?),
369 CT_REGEX => {
370 let len = cur.take_u32()? as usize;
371 let bytes = cur.take_bytes(len)?;
372 Constraint::Regex(
373 String::from_utf8(bytes).map_err(|_| CatalogDecodeError::InvalidUtf8)?,
374 )
375 }
376 CT_EMAIL => Constraint::Email,
377 CT_URL => Constraint::Url,
378 CT_NONEMPTY => Constraint::NonEmpty,
379 _ => {
380 return Err(DbError::Format(FormatError::InvalidCatalogPayload {
381 message: format!("unknown constraint tag {tag}"),
382 }))
383 }
384 };
385 v.push(c);
386 }
387 Ok(v)
388}
389
390fn encode_field_path(out: &mut Vec<u8>, path: &FieldPath) {
391 let parts = &path.0;
392 out.extend_from_slice(&(parts.len() as u32).to_le_bytes());
393 for p in parts {
394 let b = p.as_bytes();
395 out.extend_from_slice(&(b.len() as u32).to_le_bytes());
396 out.extend_from_slice(b);
397 }
398}
399
400fn decode_field_path(cur: &mut Cursor<'_>) -> Result<FieldPath, DbError> {
401 let n = cur.take_u32()? as usize;
402 if n == 0 {
403 return Err(DbError::Schema(crate::error::SchemaError::InvalidFieldPath));
404 }
405 let mut parts = Vec::with_capacity(n.min(64));
406 for _ in 0..n {
407 let len = cur.take_u32()? as usize;
408 let bytes = cur.take_bytes(len)?;
409 let s = String::from_utf8(bytes).map_err(|_| CatalogDecodeError::InvalidUtf8)?;
410 if s.is_empty() {
411 return Err(DbError::Schema(crate::error::SchemaError::InvalidFieldPath));
412 }
413 parts.push(Cow::Owned(s));
414 }
415 Ok(FieldPath(parts))
416}
417
418const TAG_BOOL: u8 = 0;
419const TAG_INT64: u8 = 1;
420const TAG_UINT64: u8 = 2;
421const TAG_FLOAT64: u8 = 3;
422const TAG_STRING: u8 = 4;
423const TAG_BYTES: u8 = 5;
424const TAG_UUID: u8 = 6;
425const TAG_TIMESTAMP: u8 = 7;
426const TAG_OPTIONAL: u8 = 8;
427const TAG_LIST: u8 = 9;
428const TAG_OBJECT: u8 = 10;
429const TAG_ENUM: u8 = 11;
430
431#[allow(clippy::only_used_in_recursion)]
433fn encode_type(out: &mut Vec<u8>, ty: &Type, depth: u32) {
434 match ty {
435 Type::Bool => out.push(TAG_BOOL),
436 Type::Int64 => out.push(TAG_INT64),
437 Type::Uint64 => out.push(TAG_UINT64),
438 Type::Float64 => out.push(TAG_FLOAT64),
439 Type::String => out.push(TAG_STRING),
440 Type::Bytes => out.push(TAG_BYTES),
441 Type::Uuid => out.push(TAG_UUID),
442 Type::Timestamp => out.push(TAG_TIMESTAMP),
443 Type::Optional(inner) => {
444 out.push(TAG_OPTIONAL);
445 encode_type(out, inner, depth + 1);
446 }
447 Type::List(inner) => {
448 out.push(TAG_LIST);
449 encode_type(out, inner, depth + 1);
450 }
451 Type::Object(fields) => {
452 out.push(TAG_OBJECT);
453 out.extend_from_slice(&(fields.len() as u32).to_le_bytes());
454 for f in fields {
455 encode_field_path(out, &f.path);
456 encode_type(out, &f.ty, depth + 1);
457 }
458 }
459 Type::Enum(variants) => {
460 out.push(TAG_ENUM);
461 out.extend_from_slice(&(variants.len() as u32).to_le_bytes());
462 for s in variants {
463 let b = s.as_bytes();
464 out.extend_from_slice(&(b.len() as u32).to_le_bytes());
465 out.extend_from_slice(b);
466 }
467 }
468 }
469}
470
471fn decode_type(cur: &mut Cursor<'_>, depth: u32) -> Result<Type, DbError> {
472 if depth > MAX_TYPE_NESTING_DEPTH {
473 return Err(CatalogDecodeError::TypeNestingTooDeep {
474 max: MAX_TYPE_NESTING_DEPTH,
475 }
476 .into());
477 }
478 let tag = cur.take_u8()?;
479 Ok(match tag {
480 TAG_BOOL => Type::Bool,
481 TAG_INT64 => Type::Int64,
482 TAG_UINT64 => Type::Uint64,
483 TAG_FLOAT64 => Type::Float64,
484 TAG_STRING => Type::String,
485 TAG_BYTES => Type::Bytes,
486 TAG_UUID => Type::Uuid,
487 TAG_TIMESTAMP => Type::Timestamp,
488 TAG_OPTIONAL => Type::Optional(Box::new(decode_type(cur, depth + 1)?)),
489 TAG_LIST => Type::List(Box::new(decode_type(cur, depth + 1)?)),
490 TAG_OBJECT => {
491 let n = cur.take_u32()? as usize;
492 let mut fields = Vec::with_capacity(n.min(1024));
493 for _ in 0..n {
494 let path = decode_field_path(cur)?;
495 let ty = decode_type(cur, depth + 1)?;
496 fields.push(FieldDef {
497 path,
498 ty,
499 constraints: Vec::new(),
500 });
501 }
502 Type::Object(fields)
503 }
504 TAG_ENUM => {
505 let n = cur.take_u32()? as usize;
506 let mut variants = Vec::with_capacity(n.min(1024));
507 for _ in 0..n {
508 let len = cur.take_u32()? as usize;
509 let bytes = cur.take_bytes(len)?;
510 let s = String::from_utf8(bytes).map_err(|_| CatalogDecodeError::InvalidUtf8)?;
511 variants.push(s);
512 }
513 Type::Enum(variants)
514 }
515 _ => {
516 return Err(DbError::Format(FormatError::InvalidCatalogPayload {
517 message: format!("unknown type tag {tag}"),
518 }))
519 }
520 })
521}
522
523struct Cursor<'a> {
524 bytes: &'a [u8],
525 pos: usize,
526}
527
528impl<'a> Cursor<'a> {
529 fn new(bytes: &'a [u8]) -> Self {
530 Self { bytes, pos: 0 }
531 }
532
533 fn remaining(&self) -> usize {
534 self.bytes.len().saturating_sub(self.pos)
535 }
536
537 fn take_u8(&mut self) -> Result<u8, DbError> {
538 if self.pos >= self.bytes.len() {
539 return Err(CatalogDecodeError::UnexpectedEof.into());
540 }
541 let b = self.bytes[self.pos];
542 self.pos += 1;
543 Ok(b)
544 }
545
546 fn take_u16(&mut self) -> Result<u16, DbError> {
547 if self.remaining() < 2 {
548 return Err(CatalogDecodeError::UnexpectedEof.into());
549 }
550 let v = u16::from_le_bytes([self.bytes[self.pos], self.bytes[self.pos + 1]]);
551 self.pos += 2;
552 Ok(v)
553 }
554
555 fn take_u32(&mut self) -> Result<u32, DbError> {
556 if self.remaining() < 4 {
557 return Err(CatalogDecodeError::UnexpectedEof.into());
558 }
559 let v = u32::from_le_bytes([
560 self.bytes[self.pos],
561 self.bytes[self.pos + 1],
562 self.bytes[self.pos + 2],
563 self.bytes[self.pos + 3],
564 ]);
565 self.pos += 4;
566 Ok(v)
567 }
568
569 fn take_u64(&mut self) -> Result<u64, DbError> {
570 if self.remaining() < 8 {
571 return Err(CatalogDecodeError::UnexpectedEof.into());
572 }
573 let mut b = [0u8; 8];
574 b.copy_from_slice(&self.bytes[self.pos..self.pos + 8]);
575 self.pos += 8;
576 Ok(u64::from_le_bytes(b))
577 }
578
579 fn take_i64(&mut self) -> Result<i64, DbError> {
580 Ok(self.take_u64()? as i64)
581 }
582
583 fn take_bytes(&mut self, n: usize) -> Result<Vec<u8>, DbError> {
584 if self.remaining() < n {
585 return Err(CatalogDecodeError::UnexpectedEof.into());
586 }
587 let slice = &self.bytes[self.pos..self.pos + n];
588 self.pos += n;
589 Ok(slice.to_vec())
590 }
591}
592
593#[cfg(test)]
594mod tests {
595 include!(concat!(
596 env!("CARGO_MANIFEST_DIR"),
597 "/tests/unit/src_catalog_codec_tests.rs"
598 ));
599}