modelvault_core/record/
payload_v2.rs1use std::collections::BTreeMap;
4
5use crate::error::{DbError, FormatError};
6use crate::record::payload_v1::{DecodedRecord, OP_DELETE};
7use crate::record::payload_v3::decode_record_payload_any;
8use crate::record::row_value::{decode_row_value, encode_row_value, RowValue};
9use crate::record::scalar::{decode_tagged_scalar, encode_tagged_scalar, Cursor, ScalarValue};
10use crate::schema::{FieldDef, Type};
11
12pub const RECORD_PAYLOAD_VERSION_V2: u16 = 2;
13
14pub fn encode_record_payload_v2(
16 collection_id: u32,
17 schema_version: u32,
18 pk: &ScalarValue,
19 pk_ty: &Type,
20 non_pk_ordered: &[(FieldDef, RowValue)],
21) -> Result<Vec<u8>, DbError> {
22 encode_record_payload_v2_op(
23 collection_id,
24 schema_version,
25 crate::record::payload_v1::OP_INSERT,
26 pk,
27 pk_ty,
28 non_pk_ordered,
29 )
30}
31
32pub fn encode_record_payload_v2_op(
34 collection_id: u32,
35 schema_version: u32,
36 op: u8,
37 pk: &ScalarValue,
38 pk_ty: &Type,
39 non_pk_ordered: &[(FieldDef, RowValue)],
40) -> Result<Vec<u8>, DbError> {
41 let mut out = Vec::new();
42 out.extend_from_slice(&RECORD_PAYLOAD_VERSION_V2.to_le_bytes());
43 out.extend_from_slice(&collection_id.to_le_bytes());
44 out.extend_from_slice(&schema_version.to_le_bytes());
45 out.push(op);
46 encode_tagged_scalar(&mut out, pk, pk_ty)?;
47 if op == OP_DELETE {
48 out.extend_from_slice(&0u32.to_le_bytes());
49 } else {
50 out.extend_from_slice(&(non_pk_ordered.len() as u32).to_le_bytes());
51 for (def, val) in non_pk_ordered {
52 encode_row_value(&mut out, val, &def.ty)?;
53 }
54 }
55 Ok(out)
56}
57
58pub(crate) fn decode_record_payload_v2_body(
60 mut cur: Cursor<'_>,
61 pk_name: &str,
62 pk_ty: &Type,
63 fields: &[FieldDef],
64) -> Result<DecodedRecord, DbError> {
65 let collection_id = cur.take_u32()?;
66 let schema_version = cur.take_u32()?;
67 let op = cur.take_u8()?;
68 let pk = decode_tagged_scalar(&mut cur, pk_ty)?;
69 let n = cur.take_u32()? as usize;
70
71 let non_pk_defs: Vec<&FieldDef> = fields
72 .iter()
73 .filter(|f| f.path.0.len() == 1 && f.path.0[0] != pk_name)
74 .collect();
75 if op == OP_DELETE {
76 if n != 0 {
77 return Err(DbError::Format(FormatError::RecordPayloadTypeMismatch));
78 }
79 } else if n != non_pk_defs.len() {
80 return Err(DbError::Format(FormatError::RecordPayloadTypeMismatch));
81 }
82
83 let mut out_fields = BTreeMap::new();
84 if op != OP_DELETE {
85 for def in non_pk_defs {
86 let name = def.path.0[0].to_string();
87 let v = decode_row_value(&mut cur, &def.ty)?;
88 out_fields.insert(name, v);
89 }
90 }
91 if cur.remaining() != 0 {
92 return Err(DbError::Format(FormatError::TrailingRecordPayload));
93 }
94
95 Ok(DecodedRecord {
96 collection_id,
97 schema_version,
98 op,
99 pk,
100 fields: out_fields,
101 })
102}
103
104pub fn decode_record_payload(
106 bytes: &[u8],
107 pk_name: &str,
108 pk_ty: &Type,
109 fields: &[FieldDef],
110) -> Result<DecodedRecord, DbError> {
111 decode_record_payload_any(bytes, pk_name, pk_ty, fields)
113}
114
115#[cfg(test)]
116mod tests {
117 include!(concat!(
118 env!("CARGO_MANIFEST_DIR"),
119 "/tests/unit/src_record_payload_v2_tests.rs"
120 ));
121}