Skip to main content

modelvault_core/record/
payload_v2.rs

1//! Record segment payload version 2 (composite values). See `docs/07_record_encoding_v2.md`.
2
3use std::collections::BTreeMap;
4
5use crate::error::{DbError, FormatError};
6use crate::record::payload_v1::{DecodedRecord, OP_DELETE};
7use crate::record::payload_v3::decode_record_payload_any;
8use crate::record::row_value::{decode_row_value, encode_row_value, RowValue};
9use crate::record::scalar::{decode_tagged_scalar, encode_tagged_scalar, Cursor, ScalarValue};
10use crate::schema::{FieldDef, Type};
11
12pub const RECORD_PAYLOAD_VERSION_V2: u16 = 2;
13
14/// Encode a record segment body (version 2). Preferred for all new inserts (0.6+).
15pub fn encode_record_payload_v2(
16    collection_id: u32,
17    schema_version: u32,
18    pk: &ScalarValue,
19    pk_ty: &Type,
20    non_pk_ordered: &[(FieldDef, RowValue)],
21) -> Result<Vec<u8>, DbError> {
22    encode_record_payload_v2_op(
23        collection_id,
24        schema_version,
25        crate::record::payload_v1::OP_INSERT,
26        pk,
27        pk_ty,
28        non_pk_ordered,
29    )
30}
31
32/// Encode a record segment body (version 2) with an explicit operation code.
33pub fn encode_record_payload_v2_op(
34    collection_id: u32,
35    schema_version: u32,
36    op: u8,
37    pk: &ScalarValue,
38    pk_ty: &Type,
39    non_pk_ordered: &[(FieldDef, RowValue)],
40) -> Result<Vec<u8>, DbError> {
41    let mut out = Vec::new();
42    out.extend_from_slice(&RECORD_PAYLOAD_VERSION_V2.to_le_bytes());
43    out.extend_from_slice(&collection_id.to_le_bytes());
44    out.extend_from_slice(&schema_version.to_le_bytes());
45    out.push(op);
46    encode_tagged_scalar(&mut out, pk, pk_ty)?;
47    if op == OP_DELETE {
48        out.extend_from_slice(&0u32.to_le_bytes());
49    } else {
50        out.extend_from_slice(&(non_pk_ordered.len() as u32).to_le_bytes());
51        for (def, val) in non_pk_ordered {
52            encode_row_value(&mut out, val, &def.ty)?;
53        }
54    }
55    Ok(out)
56}
57
58/// Decode v2 body (cursor after version `u16`).
59pub(crate) fn decode_record_payload_v2_body(
60    mut cur: Cursor<'_>,
61    pk_name: &str,
62    pk_ty: &Type,
63    fields: &[FieldDef],
64) -> Result<DecodedRecord, DbError> {
65    let collection_id = cur.take_u32()?;
66    let schema_version = cur.take_u32()?;
67    let op = cur.take_u8()?;
68    let pk = decode_tagged_scalar(&mut cur, pk_ty)?;
69    let n = cur.take_u32()? as usize;
70
71    let non_pk_defs: Vec<&FieldDef> = fields
72        .iter()
73        .filter(|f| f.path.0.len() == 1 && f.path.0[0] != pk_name)
74        .collect();
75    if op == OP_DELETE {
76        if n != 0 {
77            return Err(DbError::Format(FormatError::RecordPayloadTypeMismatch));
78        }
79    } else if n != non_pk_defs.len() {
80        return Err(DbError::Format(FormatError::RecordPayloadTypeMismatch));
81    }
82
83    let mut out_fields = BTreeMap::new();
84    if op != OP_DELETE {
85        for def in non_pk_defs {
86            let name = def.path.0[0].to_string();
87            let v = decode_row_value(&mut cur, &def.ty)?;
88            out_fields.insert(name, v);
89        }
90    }
91    if cur.remaining() != 0 {
92        return Err(DbError::Format(FormatError::TrailingRecordPayload));
93    }
94
95    Ok(DecodedRecord {
96        collection_id,
97        schema_version,
98        op,
99        pk,
100        fields: out_fields,
101    })
102}
103
104/// Decode either v1 or v2 record payload.
105pub fn decode_record_payload(
106    bytes: &[u8],
107    pk_name: &str,
108    pk_ty: &Type,
109    fields: &[FieldDef],
110) -> Result<DecodedRecord, DbError> {
111    // Delegate to the central v1/v2/v3 dispatcher.
112    decode_record_payload_any(bytes, pk_name, pk_ty, fields)
113}
114
115#[cfg(test)]
116mod tests {
117    include!(concat!(
118        env!("CARGO_MANIFEST_DIR"),
119        "/tests/unit/src_record_payload_v2_tests.rs"
120    ));
121}