modelvault_core/record/
payload_v1.rs1use std::collections::BTreeMap;
4
5use crate::error::{DbError, FormatError};
6use crate::record::row_value::RowValue;
7use crate::record::scalar::{decode_tagged_scalar, encode_tagged_scalar, Cursor, ScalarValue};
8use crate::schema::{FieldDef, Type};
9
10pub const RECORD_PAYLOAD_VERSION: u16 = 1;
11pub const OP_INSERT: u8 = 1;
12pub const OP_REPLACE: u8 = 2;
13pub const OP_DELETE: u8 = 3;
14
15#[derive(Debug, Clone, PartialEq)]
16pub struct DecodedRecord {
17 pub collection_id: u32,
18 pub schema_version: u32,
19 pub op: u8,
20 pub pk: ScalarValue,
21 pub fields: BTreeMap<String, RowValue>,
23}
24
25pub fn encode_record_payload_v1(
26 collection_id: u32,
27 schema_version: u32,
28 pk: &ScalarValue,
29 pk_ty: &Type,
30 non_pk_ordered: &[(FieldDef, ScalarValue)],
31) -> Result<Vec<u8>, DbError> {
32 let mut out = Vec::new();
33 out.extend_from_slice(&RECORD_PAYLOAD_VERSION.to_le_bytes());
34 out.extend_from_slice(&collection_id.to_le_bytes());
35 out.extend_from_slice(&schema_version.to_le_bytes());
36 out.push(OP_INSERT);
37 encode_tagged_scalar(&mut out, pk, pk_ty)?;
38 out.extend_from_slice(&(non_pk_ordered.len() as u32).to_le_bytes());
39 for (def, val) in non_pk_ordered {
40 encode_tagged_scalar(&mut out, val, &def.ty)?;
41 }
42 Ok(out)
43}
44
45pub(crate) fn decode_record_payload_v1_body(
47 mut cur: Cursor<'_>,
48 pk_name: &str,
49 pk_ty: &Type,
50 fields: &[FieldDef],
51) -> Result<DecodedRecord, DbError> {
52 let collection_id = cur.take_u32()?;
53 let schema_version = cur.take_u32()?;
54 let op = cur.take_u8()?;
55 let pk = decode_tagged_scalar(&mut cur, pk_ty)?;
56 let n = cur.take_u32()? as usize;
57
58 let non_pk_defs: Vec<&FieldDef> = fields
59 .iter()
60 .filter(|f| f.path.0.len() == 1 && f.path.0[0] != pk_name)
61 .collect();
62 if op == OP_DELETE {
63 if n != 0 {
64 return Err(DbError::Format(FormatError::RecordPayloadTypeMismatch));
65 }
66 } else if n != non_pk_defs.len() {
67 return Err(DbError::Format(FormatError::RecordPayloadTypeMismatch));
68 }
69
70 let mut out_fields = BTreeMap::new();
71 if op != OP_DELETE {
72 for def in non_pk_defs {
73 let name = def.path.0[0].to_string();
74 let v = decode_tagged_scalar(&mut cur, &def.ty)?;
75 out_fields.insert(name, RowValue::from_scalar(v));
76 }
77 }
78 if cur.remaining() != 0 {
79 return Err(DbError::Format(FormatError::TrailingRecordPayload));
80 }
81
82 Ok(DecodedRecord {
83 collection_id,
84 schema_version,
85 op,
86 pk,
87 fields: out_fields,
88 })
89}
90
91pub fn decode_record_payload_v1(
92 bytes: &[u8],
93 pk_name: &str,
94 pk_ty: &Type,
95 fields: &[FieldDef],
96) -> Result<DecodedRecord, DbError> {
97 let mut cur = Cursor::new(bytes);
98 let ver = cur.take_u16()?;
99 if ver != RECORD_PAYLOAD_VERSION {
100 return Err(DbError::Format(FormatError::UnknownRecordPayloadVersion {
101 got: ver,
102 }));
103 }
104 decode_record_payload_v1_body(cur, pk_name, pk_ty, fields)
105}