1use std::borrow::Cow;
4
5use crate::error::{DbError, FormatError};
6use crate::schema::{Constraint, FieldDef, FieldPath, IndexDef, IndexKind, Type};
7
8pub const MAX_COLLECTION_NAME_BYTES: usize = 1023;
10
11pub const CATALOG_PAYLOAD_VERSION_V1: u16 = 1;
13pub const CATALOG_PAYLOAD_VERSION_V2: u16 = 2;
15pub const CATALOG_PAYLOAD_VERSION_V3: u16 = 3;
17pub const CATALOG_PAYLOAD_VERSION_V4: u16 = 4;
19pub const CATALOG_PAYLOAD_VERSION: u16 = CATALOG_PAYLOAD_VERSION_V4;
21
22pub const ENTRY_KIND_CREATE_COLLECTION: u16 = 1;
23pub const ENTRY_KIND_NEW_SCHEMA_VERSION: u16 = 2;
24
25pub const MAX_TYPE_NESTING_DEPTH: u32 = 32;
27
28pub const MAX_FIELDS_PER_SCHEMA: u32 = 4096;
30
31#[derive(Debug, Clone, PartialEq)]
32pub enum CatalogDecodeError {
33 UnexpectedEof,
34 UnknownCatalogPayloadVersion { got: u16 },
35 UnknownEntryKind { got: u16 },
36 TrailingBytes,
37 TypeNestingTooDeep { max: u32 },
38 InvalidUtf8,
39 CollectionNameTooLong { got: usize },
40 EmptyCollectionName,
41 InvalidCreateSchemaVersion { got: u32 },
42 IndexNameTooLong { got: usize },
43 EmptyIndexName,
44 UnknownIndexKind { got: u8 },
45}
46
47impl From<CatalogDecodeError> for DbError {
48 fn from(e: CatalogDecodeError) -> Self {
49 DbError::Format(FormatError::InvalidCatalogPayload {
50 message: format!("{e:?}"),
51 })
52 }
53}
54
55pub fn encode_catalog_payload(record: &CatalogRecordWire) -> Vec<u8> {
57 let mut out = Vec::new();
58 out.extend_from_slice(&CATALOG_PAYLOAD_VERSION.to_le_bytes());
59 match record {
60 CatalogRecordWire::CreateCollection {
61 collection_id,
62 name,
63 schema_version,
64 fields,
65 indexes,
66 primary_field,
67 } => {
68 out.extend_from_slice(&ENTRY_KIND_CREATE_COLLECTION.to_le_bytes());
69 out.extend_from_slice(&collection_id.to_le_bytes());
70 encode_name(&mut out, name);
71 out.extend_from_slice(&schema_version.to_le_bytes());
72 encode_fields_v3(&mut out, fields);
73 encode_indexes(&mut out, indexes);
74 encode_optional_primary_name(&mut out, primary_field.as_deref());
75 }
76 CatalogRecordWire::NewSchemaVersion {
77 collection_id,
78 schema_version,
79 fields,
80 indexes,
81 } => {
82 out.extend_from_slice(&ENTRY_KIND_NEW_SCHEMA_VERSION.to_le_bytes());
83 out.extend_from_slice(&collection_id.to_le_bytes());
84 out.extend_from_slice(&schema_version.to_le_bytes());
85 encode_fields_v3(&mut out, fields);
86 encode_indexes(&mut out, indexes);
87 }
88 }
89 out
90}
91
92#[derive(Debug, Clone, PartialEq)]
94pub enum CatalogRecordWire {
95 CreateCollection {
96 collection_id: u32,
97 name: String,
98 schema_version: u32,
99 fields: Vec<FieldDef>,
100 indexes: Vec<IndexDef>,
101 primary_field: Option<String>,
103 },
104 NewSchemaVersion {
105 collection_id: u32,
106 schema_version: u32,
107 fields: Vec<FieldDef>,
108 indexes: Vec<IndexDef>,
109 },
110}
111
112pub fn decode_catalog_payload(bytes: &[u8]) -> Result<CatalogRecordWire, DbError> {
113 let mut cur = Cursor::new(bytes);
114 let ver = cur.take_u16()?;
115 if ver != CATALOG_PAYLOAD_VERSION_V1
116 && ver != CATALOG_PAYLOAD_VERSION_V2
117 && ver != CATALOG_PAYLOAD_VERSION_V3
118 && ver != CATALOG_PAYLOAD_VERSION_V4
119 {
120 return Err(CatalogDecodeError::UnknownCatalogPayloadVersion { got: ver }.into());
121 }
122 let kind = cur.take_u16()?;
123 match kind {
124 ENTRY_KIND_CREATE_COLLECTION => {
125 let collection_id = cur.take_u32()?;
126 let name = decode_name(&mut cur)?;
127 let schema_version = cur.take_u32()?;
128 let fields = decode_fields(&mut cur, ver)?;
129 let indexes = if ver >= CATALOG_PAYLOAD_VERSION_V4 {
130 decode_indexes(&mut cur)?
131 } else {
132 Vec::new()
133 };
134 let primary_field = if ver >= CATALOG_PAYLOAD_VERSION_V2 {
135 decode_optional_primary_name(&mut cur)?
136 } else {
137 None
138 };
139 if cur.remaining() != 0 {
140 return Err(CatalogDecodeError::TrailingBytes.into());
141 }
142 Ok(CatalogRecordWire::CreateCollection {
143 collection_id,
144 name,
145 schema_version,
146 fields,
147 indexes,
148 primary_field,
149 })
150 }
151 ENTRY_KIND_NEW_SCHEMA_VERSION => {
152 let collection_id = cur.take_u32()?;
153 let schema_version = cur.take_u32()?;
154 let fields = decode_fields(&mut cur, ver)?;
155 let indexes = if ver >= CATALOG_PAYLOAD_VERSION_V4 {
156 decode_indexes(&mut cur)?
157 } else {
158 Vec::new()
159 };
160 if cur.remaining() != 0 {
161 return Err(CatalogDecodeError::TrailingBytes.into());
162 }
163 Ok(CatalogRecordWire::NewSchemaVersion {
164 collection_id,
165 schema_version,
166 fields,
167 indexes,
168 })
169 }
170 _ => Err(CatalogDecodeError::UnknownEntryKind { got: kind }.into()),
171 }
172}
173
174fn encode_optional_primary_name(out: &mut Vec<u8>, primary: Option<&str>) {
175 match primary {
176 None => out.extend_from_slice(&0u32.to_le_bytes()),
177 Some(s) => {
178 let b = s.as_bytes();
179 out.extend_from_slice(&(b.len() as u32).to_le_bytes());
180 out.extend_from_slice(b);
181 }
182 }
183}
184
185fn decode_optional_primary_name(cur: &mut Cursor<'_>) -> Result<Option<String>, DbError> {
186 let n = cur.take_u32()? as usize;
187 if n == 0 {
188 return Ok(None);
189 }
190 if n > MAX_COLLECTION_NAME_BYTES {
191 return Err(CatalogDecodeError::CollectionNameTooLong { got: n }.into());
192 }
193 let bytes = cur.take_bytes(n)?;
194 let s = String::from_utf8(bytes).map_err(|_| CatalogDecodeError::InvalidUtf8)?;
195 Ok(Some(s))
196}
197
198fn encode_name(out: &mut Vec<u8>, name: &str) {
199 let b = name.as_bytes();
200 out.extend_from_slice(&(b.len() as u32).to_le_bytes());
201 out.extend_from_slice(b);
202}
203
204fn decode_name(cur: &mut Cursor<'_>) -> Result<String, DbError> {
205 let n = cur.take_u32()? as usize;
206 if n == 0 {
207 return Err(CatalogDecodeError::EmptyCollectionName.into());
208 }
209 if n > MAX_COLLECTION_NAME_BYTES {
210 return Err(CatalogDecodeError::CollectionNameTooLong { got: n }.into());
211 }
212 let bytes = cur.take_bytes(n)?;
213 String::from_utf8(bytes).map_err(|_| CatalogDecodeError::InvalidUtf8.into())
214}
215
216fn encode_indexes(out: &mut Vec<u8>, indexes: &[IndexDef]) {
217 out.extend_from_slice(&(indexes.len() as u32).to_le_bytes());
218 for idx in indexes {
219 match idx.kind {
220 IndexKind::Unique => out.push(1),
221 IndexKind::NonUnique => out.push(2),
222 }
223 encode_field_path(out, &idx.path);
224 let b = idx.name.as_bytes();
225 out.extend_from_slice(&(b.len() as u32).to_le_bytes());
226 out.extend_from_slice(b);
227 }
228}
229
230fn decode_indexes(cur: &mut Cursor<'_>) -> Result<Vec<IndexDef>, DbError> {
231 let n = cur.take_u32()? as usize;
232 let mut v = Vec::with_capacity(n.min(1024));
233 for _ in 0..n {
234 let kind_tag = cur.take_u8()?;
235 let kind = match kind_tag {
236 1 => IndexKind::Unique,
237 2 => IndexKind::NonUnique,
238 _ => return Err(CatalogDecodeError::UnknownIndexKind { got: kind_tag }.into()),
239 };
240 let path = decode_field_path(cur)?;
241 let name_len = cur.take_u32()? as usize;
242 if name_len == 0 {
243 return Err(CatalogDecodeError::EmptyIndexName.into());
244 }
245 if name_len > MAX_COLLECTION_NAME_BYTES {
246 return Err(CatalogDecodeError::IndexNameTooLong { got: name_len }.into());
247 }
248 let bytes = cur.take_bytes(name_len)?;
249 let name = String::from_utf8(bytes).map_err(|_| CatalogDecodeError::InvalidUtf8)?;
250 v.push(IndexDef { name, path, kind });
251 }
252 Ok(v)
253}
254
255fn encode_fields_v3(out: &mut Vec<u8>, fields: &[FieldDef]) {
256 out.extend_from_slice(&(fields.len() as u32).to_le_bytes());
257 for f in fields {
258 encode_field_path(out, &f.path);
259 encode_type(out, &f.ty, 0);
260 encode_constraints(out, &f.constraints);
261 }
262}
263
264fn decode_fields(cur: &mut Cursor<'_>, catalog_ver: u16) -> Result<Vec<FieldDef>, DbError> {
265 let n = cur.take_u32()?;
266 if n > MAX_FIELDS_PER_SCHEMA {
267 return Err(DbError::Format(FormatError::InvalidCatalogPayload {
268 message: format!("field count {n} exceeds maximum {MAX_FIELDS_PER_SCHEMA}"),
269 }));
270 }
271 let n = n as usize;
272 let mut v = Vec::with_capacity(n.min(1024));
273 for _ in 0..n {
274 let path = decode_field_path(cur)?;
275 let ty = decode_type(cur, 0)?;
276 let constraints = if catalog_ver >= CATALOG_PAYLOAD_VERSION_V3 {
277 decode_constraints(cur)?
278 } else {
279 Vec::new()
280 };
281 v.push(FieldDef {
282 path,
283 ty,
284 constraints,
285 });
286 }
287 Ok(v)
288}
289
290const CT_MIN_I64: u8 = 1;
291const CT_MAX_I64: u8 = 2;
292const CT_MIN_U64: u8 = 3;
293const CT_MAX_U64: u8 = 4;
294const CT_MIN_F64: u8 = 5;
295const CT_MAX_F64: u8 = 6;
296const CT_MIN_LEN: u8 = 7;
297const CT_MAX_LEN: u8 = 8;
298const CT_REGEX: u8 = 9;
299const CT_EMAIL: u8 = 10;
300const CT_URL: u8 = 11;
301const CT_NONEMPTY: u8 = 12;
302
303fn encode_constraints(out: &mut Vec<u8>, c: &[Constraint]) {
304 out.extend_from_slice(&(c.len() as u32).to_le_bytes());
305 for x in c {
306 match x {
307 Constraint::MinI64(n) => {
308 out.push(CT_MIN_I64);
309 out.extend_from_slice(&n.to_le_bytes());
310 }
311 Constraint::MaxI64(n) => {
312 out.push(CT_MAX_I64);
313 out.extend_from_slice(&n.to_le_bytes());
314 }
315 Constraint::MinU64(n) => {
316 out.push(CT_MIN_U64);
317 out.extend_from_slice(&n.to_le_bytes());
318 }
319 Constraint::MaxU64(n) => {
320 out.push(CT_MAX_U64);
321 out.extend_from_slice(&n.to_le_bytes());
322 }
323 Constraint::MinF64(n) => {
324 out.push(CT_MIN_F64);
325 out.extend_from_slice(&n.to_le_bytes());
326 }
327 Constraint::MaxF64(n) => {
328 out.push(CT_MAX_F64);
329 out.extend_from_slice(&n.to_le_bytes());
330 }
331 Constraint::MinLength(n) => {
332 out.push(CT_MIN_LEN);
333 out.extend_from_slice(&n.to_le_bytes());
334 }
335 Constraint::MaxLength(n) => {
336 out.push(CT_MAX_LEN);
337 out.extend_from_slice(&n.to_le_bytes());
338 }
339 Constraint::Regex(s) => {
340 out.push(CT_REGEX);
341 let b = s.as_bytes();
342 out.extend_from_slice(&(b.len() as u32).to_le_bytes());
343 out.extend_from_slice(b);
344 }
345 Constraint::Email => out.push(CT_EMAIL),
346 Constraint::Url => out.push(CT_URL),
347 Constraint::NonEmpty => out.push(CT_NONEMPTY),
348 }
349 }
350}
351
352fn decode_constraints(cur: &mut Cursor<'_>) -> Result<Vec<Constraint>, DbError> {
353 let n = cur.take_u32()? as usize;
354 let mut v = Vec::with_capacity(n.min(4096));
355 for _ in 0..n {
356 let tag = cur.take_u8()?;
357 let c = match tag {
358 CT_MIN_I64 => Constraint::MinI64(cur.take_i64()?),
359 CT_MAX_I64 => Constraint::MaxI64(cur.take_i64()?),
360 CT_MIN_U64 => Constraint::MinU64(cur.take_u64()?),
361 CT_MAX_U64 => Constraint::MaxU64(cur.take_u64()?),
362 CT_MIN_F64 => Constraint::MinF64(f64::from_bits(cur.take_u64()?)),
363 CT_MAX_F64 => Constraint::MaxF64(f64::from_bits(cur.take_u64()?)),
364 CT_MIN_LEN => Constraint::MinLength(cur.take_u64()?),
365 CT_MAX_LEN => Constraint::MaxLength(cur.take_u64()?),
366 CT_REGEX => {
367 let len = cur.take_u32()? as usize;
368 let bytes = cur.take_bytes(len)?;
369 Constraint::Regex(
370 String::from_utf8(bytes).map_err(|_| CatalogDecodeError::InvalidUtf8)?,
371 )
372 }
373 CT_EMAIL => Constraint::Email,
374 CT_URL => Constraint::Url,
375 CT_NONEMPTY => Constraint::NonEmpty,
376 _ => {
377 return Err(DbError::Format(FormatError::InvalidCatalogPayload {
378 message: format!("unknown constraint tag {tag}"),
379 }))
380 }
381 };
382 v.push(c);
383 }
384 Ok(v)
385}
386
387fn encode_field_path(out: &mut Vec<u8>, path: &FieldPath) {
388 let parts = &path.0;
389 out.extend_from_slice(&(parts.len() as u32).to_le_bytes());
390 for p in parts {
391 let b = p.as_bytes();
392 out.extend_from_slice(&(b.len() as u32).to_le_bytes());
393 out.extend_from_slice(b);
394 }
395}
396
397fn decode_field_path(cur: &mut Cursor<'_>) -> Result<FieldPath, DbError> {
398 let n = cur.take_u32()? as usize;
399 if n == 0 {
400 return Err(DbError::Schema(crate::error::SchemaError::InvalidFieldPath));
401 }
402 let mut parts = Vec::with_capacity(n.min(64));
403 for _ in 0..n {
404 let len = cur.take_u32()? as usize;
405 let bytes = cur.take_bytes(len)?;
406 let s = String::from_utf8(bytes).map_err(|_| CatalogDecodeError::InvalidUtf8)?;
407 if s.is_empty() {
408 return Err(DbError::Schema(crate::error::SchemaError::InvalidFieldPath));
409 }
410 parts.push(Cow::Owned(s));
411 }
412 Ok(FieldPath(parts))
413}
414
415const TAG_BOOL: u8 = 0;
416const TAG_INT64: u8 = 1;
417const TAG_UINT64: u8 = 2;
418const TAG_FLOAT64: u8 = 3;
419const TAG_STRING: u8 = 4;
420const TAG_BYTES: u8 = 5;
421const TAG_UUID: u8 = 6;
422const TAG_TIMESTAMP: u8 = 7;
423const TAG_OPTIONAL: u8 = 8;
424const TAG_LIST: u8 = 9;
425const TAG_OBJECT: u8 = 10;
426const TAG_ENUM: u8 = 11;
427
428#[allow(clippy::only_used_in_recursion)]
430fn encode_type(out: &mut Vec<u8>, ty: &Type, depth: u32) {
431 match ty {
432 Type::Bool => out.push(TAG_BOOL),
433 Type::Int64 => out.push(TAG_INT64),
434 Type::Uint64 => out.push(TAG_UINT64),
435 Type::Float64 => out.push(TAG_FLOAT64),
436 Type::String => out.push(TAG_STRING),
437 Type::Bytes => out.push(TAG_BYTES),
438 Type::Uuid => out.push(TAG_UUID),
439 Type::Timestamp => out.push(TAG_TIMESTAMP),
440 Type::Optional(inner) => {
441 out.push(TAG_OPTIONAL);
442 encode_type(out, inner, depth + 1);
443 }
444 Type::List(inner) => {
445 out.push(TAG_LIST);
446 encode_type(out, inner, depth + 1);
447 }
448 Type::Object(fields) => {
449 out.push(TAG_OBJECT);
450 out.extend_from_slice(&(fields.len() as u32).to_le_bytes());
451 for f in fields {
452 encode_field_path(out, &f.path);
453 encode_type(out, &f.ty, depth + 1);
454 }
455 }
456 Type::Enum(variants) => {
457 out.push(TAG_ENUM);
458 out.extend_from_slice(&(variants.len() as u32).to_le_bytes());
459 for s in variants {
460 let b = s.as_bytes();
461 out.extend_from_slice(&(b.len() as u32).to_le_bytes());
462 out.extend_from_slice(b);
463 }
464 }
465 }
466}
467
468fn decode_type(cur: &mut Cursor<'_>, depth: u32) -> Result<Type, DbError> {
469 if depth > MAX_TYPE_NESTING_DEPTH {
470 return Err(CatalogDecodeError::TypeNestingTooDeep {
471 max: MAX_TYPE_NESTING_DEPTH,
472 }
473 .into());
474 }
475 let tag = cur.take_u8()?;
476 Ok(match tag {
477 TAG_BOOL => Type::Bool,
478 TAG_INT64 => Type::Int64,
479 TAG_UINT64 => Type::Uint64,
480 TAG_FLOAT64 => Type::Float64,
481 TAG_STRING => Type::String,
482 TAG_BYTES => Type::Bytes,
483 TAG_UUID => Type::Uuid,
484 TAG_TIMESTAMP => Type::Timestamp,
485 TAG_OPTIONAL => Type::Optional(Box::new(decode_type(cur, depth + 1)?)),
486 TAG_LIST => Type::List(Box::new(decode_type(cur, depth + 1)?)),
487 TAG_OBJECT => {
488 let n = cur.take_u32()? as usize;
489 let mut fields = Vec::with_capacity(n.min(1024));
490 for _ in 0..n {
491 let path = decode_field_path(cur)?;
492 let ty = decode_type(cur, depth + 1)?;
493 fields.push(FieldDef {
494 path,
495 ty,
496 constraints: Vec::new(),
497 });
498 }
499 Type::Object(fields)
500 }
501 TAG_ENUM => {
502 let n = cur.take_u32()? as usize;
503 let mut variants = Vec::with_capacity(n.min(1024));
504 for _ in 0..n {
505 let len = cur.take_u32()? as usize;
506 let bytes = cur.take_bytes(len)?;
507 let s = String::from_utf8(bytes).map_err(|_| CatalogDecodeError::InvalidUtf8)?;
508 variants.push(s);
509 }
510 Type::Enum(variants)
511 }
512 _ => {
513 return Err(DbError::Format(FormatError::InvalidCatalogPayload {
514 message: format!("unknown type tag {tag}"),
515 }))
516 }
517 })
518}
519
520struct Cursor<'a> {
521 bytes: &'a [u8],
522 pos: usize,
523}
524
525impl<'a> Cursor<'a> {
526 fn new(bytes: &'a [u8]) -> Self {
527 Self { bytes, pos: 0 }
528 }
529
530 fn remaining(&self) -> usize {
531 self.bytes.len().saturating_sub(self.pos)
532 }
533
534 fn take_u8(&mut self) -> Result<u8, DbError> {
535 if self.pos >= self.bytes.len() {
536 return Err(CatalogDecodeError::UnexpectedEof.into());
537 }
538 let b = self.bytes[self.pos];
539 self.pos += 1;
540 Ok(b)
541 }
542
543 fn take_u16(&mut self) -> Result<u16, DbError> {
544 if self.remaining() < 2 {
545 return Err(CatalogDecodeError::UnexpectedEof.into());
546 }
547 let v = u16::from_le_bytes([self.bytes[self.pos], self.bytes[self.pos + 1]]);
548 self.pos += 2;
549 Ok(v)
550 }
551
552 fn take_u32(&mut self) -> Result<u32, DbError> {
553 if self.remaining() < 4 {
554 return Err(CatalogDecodeError::UnexpectedEof.into());
555 }
556 let v = u32::from_le_bytes([
557 self.bytes[self.pos],
558 self.bytes[self.pos + 1],
559 self.bytes[self.pos + 2],
560 self.bytes[self.pos + 3],
561 ]);
562 self.pos += 4;
563 Ok(v)
564 }
565
566 fn take_u64(&mut self) -> Result<u64, DbError> {
567 if self.remaining() < 8 {
568 return Err(CatalogDecodeError::UnexpectedEof.into());
569 }
570 let mut b = [0u8; 8];
571 b.copy_from_slice(&self.bytes[self.pos..self.pos + 8]);
572 self.pos += 8;
573 Ok(u64::from_le_bytes(b))
574 }
575
576 fn take_i64(&mut self) -> Result<i64, DbError> {
577 Ok(self.take_u64()? as i64)
578 }
579
580 fn take_bytes(&mut self, n: usize) -> Result<Vec<u8>, DbError> {
581 if self.remaining() < n {
582 return Err(CatalogDecodeError::UnexpectedEof.into());
583 }
584 let slice = &self.bytes[self.pos..self.pos + n];
585 self.pos += n;
586 Ok(slice.to_vec())
587 }
588}
589
590#[cfg(test)]
591mod tests {
592 include!(concat!(
593 env!("CARGO_MANIFEST_DIR"),
594 "/tests/unit/src_catalog_codec_tests.rs"
595 ));
596}