flow_record_common/
record_pack.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
use std::io::Cursor;

use rmpv::Value;

use crate::{Error, ObjectType, FlowRecord};

pub struct RecordPack(Value);

impl RecordPack {
    pub fn with_descriptor(descriptor: Value) -> Self {
        Self(Value::Array(vec![
            ObjectType::RecordPackTypeDescriptor.into(),
            descriptor,
        ]))
    }

    pub fn with_record<R>(record: R) -> Self
    where
        R: FlowRecord,
    {
        Self(Value::Array(vec![
            ObjectType::RecordPackTypeRecord.into(),
            Value::Array(vec![
                Value::Array(vec![
                    Value::String(R::name().into()),
                    Value::Integer(R::descriptor_hash().into()),
                ]),
                record.into_value(),
            ]),
        ]))
    }

    pub fn inner(&self) -> &Value {
        &self.0
    }
}

impl TryFrom<RecordPack> for Value {
    type Error = rmpv::encode::Error;

    fn try_from(value: RecordPack) -> Result<Self, Self::Error> {
        let mut buffer = Vec::new();
        rmpv::encode::write_value(&mut buffer, &value.0)?;

        Ok(Value::Ext(ObjectType::RecordTypeExt as i8, buffer))
    }
}

impl TryFrom<Value> for RecordPack {
    type Error = crate::Error;

    fn try_from(value: Value) -> Result<Self, Self::Error> {
        match value {
            Value::Ext(type_id, vec) => {
                if type_id == ObjectType::RecordTypeExt as i8 {
                    let payload = rmpv::decode::read_value(&mut Cursor::new(vec))?;
                    Ok(Self(payload))
                } else {
                    Err(Error::InvalidExtTypeId(type_id))
                }
            }
            _ => Err(Error::ExpectedExtValue),
        }
    }
}