Skip to main content

modelvault_core/record/
payload_v1.rs

1//! Record segment body encoding v1 ([`RECORD_PAYLOAD_VERSION`]).
2
3use std::collections::BTreeMap;
4
5use crate::error::{DbError, FormatError};
6use crate::record::row_value::RowValue;
7use crate::record::scalar::{decode_tagged_scalar, encode_tagged_scalar, Cursor, ScalarValue};
8use crate::schema::{FieldDef, Type};
9
10pub const RECORD_PAYLOAD_VERSION: u16 = 1;
11pub const OP_INSERT: u8 = 1;
12pub const OP_REPLACE: u8 = 2;
13pub const OP_DELETE: u8 = 3;
14
15#[derive(Debug, Clone, PartialEq)]
16pub struct DecodedRecord {
17    pub collection_id: u32,
18    pub schema_version: u32,
19    pub op: u8,
20    pub pk: ScalarValue,
21    /// Top-level field name -> value for non-PK columns.
22    pub fields: BTreeMap<String, RowValue>,
23}
24
25pub fn encode_record_payload_v1(
26    collection_id: u32,
27    schema_version: u32,
28    pk: &ScalarValue,
29    pk_ty: &Type,
30    non_pk_ordered: &[(FieldDef, ScalarValue)],
31) -> Result<Vec<u8>, DbError> {
32    let mut out = Vec::new();
33    out.extend_from_slice(&RECORD_PAYLOAD_VERSION.to_le_bytes());
34    out.extend_from_slice(&collection_id.to_le_bytes());
35    out.extend_from_slice(&schema_version.to_le_bytes());
36    out.push(OP_INSERT);
37    encode_tagged_scalar(&mut out, pk, pk_ty)?;
38    out.extend_from_slice(&(non_pk_ordered.len() as u32).to_le_bytes());
39    for (def, val) in non_pk_ordered {
40        encode_tagged_scalar(&mut out, val, &def.ty)?;
41    }
42    Ok(out)
43}
44
45/// Decode v1 payload body (cursor positioned **after** the `u16` version field).
46pub(crate) fn decode_record_payload_v1_body(
47    mut cur: Cursor<'_>,
48    pk_name: &str,
49    pk_ty: &Type,
50    fields: &[FieldDef],
51) -> Result<DecodedRecord, DbError> {
52    let collection_id = cur.take_u32()?;
53    let schema_version = cur.take_u32()?;
54    let op = cur.take_u8()?;
55    let pk = decode_tagged_scalar(&mut cur, pk_ty)?;
56    let n = cur.take_u32()? as usize;
57
58    let non_pk_defs: Vec<&FieldDef> = fields
59        .iter()
60        .filter(|f| f.path.0.len() == 1 && f.path.0[0] != pk_name)
61        .collect();
62    if op == OP_DELETE {
63        if n != 0 {
64            return Err(DbError::Format(FormatError::RecordPayloadTypeMismatch));
65        }
66    } else if n != non_pk_defs.len() {
67        return Err(DbError::Format(FormatError::RecordPayloadTypeMismatch));
68    }
69
70    let mut out_fields = BTreeMap::new();
71    if op != OP_DELETE {
72        for def in non_pk_defs {
73            let name = def.path.0[0].to_string();
74            let v = decode_tagged_scalar(&mut cur, &def.ty)?;
75            out_fields.insert(name, RowValue::from_scalar(v));
76        }
77    }
78    if cur.remaining() != 0 {
79        return Err(DbError::Format(FormatError::TrailingRecordPayload));
80    }
81
82    Ok(DecodedRecord {
83        collection_id,
84        schema_version,
85        op,
86        pk,
87        fields: out_fields,
88    })
89}
90
91pub fn decode_record_payload_v1(
92    bytes: &[u8],
93    pk_name: &str,
94    pk_ty: &Type,
95    fields: &[FieldDef],
96) -> Result<DecodedRecord, DbError> {
97    let mut cur = Cursor::new(bytes);
98    let ver = cur.take_u16()?;
99    if ver != RECORD_PAYLOAD_VERSION {
100        return Err(DbError::Format(FormatError::UnknownRecordPayloadVersion {
101            got: ver,
102        }));
103    }
104    decode_record_payload_v1_body(cur, pk_name, pk_ty, fields)
105}