flow_record_common/
record_pack.rs

1use std::io::Cursor;
2
3use rmpv::Value;
4
5use crate::{Error, ObjectType, FlowRecord};
6
7pub struct RecordPack(Value);
8
9impl RecordPack {
10    pub fn with_descriptor(descriptor: Value) -> Self {
11        Self(Value::Array(vec![
12            ObjectType::RecordPackTypeDescriptor.into(),
13            descriptor,
14        ]))
15    }
16
17    pub fn with_record<R>(record: R) -> Self
18    where
19        R: FlowRecord,
20    {
21        Self(Value::Array(vec![
22            ObjectType::RecordPackTypeRecord.into(),
23            Value::Array(vec![
24                Value::Array(vec![
25                    Value::String(R::name().into()),
26                    Value::Integer(R::descriptor_hash().into()),
27                ]),
28                record.into_value(),
29            ]),
30        ]))
31    }
32
33    pub fn inner(&self) -> &Value {
34        &self.0
35    }
36}
37
38impl TryFrom<RecordPack> for Value {
39    type Error = rmpv::encode::Error;
40
41    fn try_from(value: RecordPack) -> Result<Self, Self::Error> {
42        let mut buffer = Vec::new();
43        rmpv::encode::write_value(&mut buffer, &value.0)?;
44
45        Ok(Value::Ext(ObjectType::RecordTypeExt as i8, buffer))
46    }
47}
48
49impl TryFrom<Value> for RecordPack {
50    type Error = crate::Error;
51
52    fn try_from(value: Value) -> Result<Self, Self::Error> {
53        match value {
54            Value::Ext(type_id, vec) => {
55                if type_id == ObjectType::RecordTypeExt as i8 {
56                    let payload = rmpv::decode::read_value(&mut Cursor::new(vec))?;
57                    Ok(Self(payload))
58                } else {
59                    Err(Error::InvalidExtTypeId(type_id))
60                }
61            }
62            _ => Err(Error::ExpectedExtValue),
63        }
64    }
65}