1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
use super::Data;
use super::Event;
use super::{Attributes, AttributesReader};
use crate::event::SpecVersion;
use crate::message::{
    BinaryDeserializer, BinarySerializer, MessageAttributeValue, Result, StructuredDeserializer,
    StructuredSerializer,
};

impl StructuredDeserializer for Event {
    fn deserialize_structured<R, V: StructuredSerializer<R>>(self, visitor: V) -> Result<R> {
        let vec: Vec<u8> = serde_json::to_vec(&self)?;
        visitor.set_structured_event(vec)
    }
}

impl BinaryDeserializer for Event {
    fn deserialize_binary<R: Sized, V: BinarySerializer<R>>(self, mut visitor: V) -> Result<R> {
        visitor = visitor.set_spec_version(self.get_specversion())?;
        visitor = self.attributes.deserialize_attributes(visitor)?;
        for (k, v) in self.extensions.into_iter() {
            visitor = visitor.set_extension(&k, v.into())?;
        }
        match self.data {
            Some(Data::String(s)) => visitor.end_with_data(s.into_bytes()),
            Some(Data::Binary(v)) => visitor.end_with_data(v),
            Some(Data::Json(j)) => {
                let vec: Vec<u8> = serde_json::to_vec(&j)?;
                visitor.end_with_data(vec)
            }
            None => visitor.end(),
        }
    }
}

pub(crate) trait AttributesDeserializer {
    fn deserialize_attributes<R: Sized, V: BinarySerializer<R>>(self, visitor: V) -> Result<V>;
}

pub(crate) trait AttributesSerializer {
    fn serialize_attribute(&mut self, name: &str, value: MessageAttributeValue) -> Result<()>;
}

impl AttributesDeserializer for Attributes {
    fn deserialize_attributes<R: Sized, V: BinarySerializer<R>>(self, visitor: V) -> Result<V> {
        match self {
            Attributes::V03(v03) => v03.deserialize_attributes(visitor),
            Attributes::V10(v10) => v10.deserialize_attributes(visitor),
        }
    }
}

impl AttributesSerializer for Attributes {
    fn serialize_attribute(&mut self, name: &str, value: MessageAttributeValue) -> Result<()> {
        match self {
            Attributes::V03(v03) => v03.serialize_attribute(name, value),
            Attributes::V10(v10) => v10.serialize_attribute(name, value),
        }
    }
}

impl StructuredSerializer<Event> for Event {
    fn set_structured_event(mut self, bytes: Vec<u8>) -> Result<Event> {
        let new_event: Event = serde_json::from_slice(&bytes)?;
        self.attributes = new_event.attributes;
        self.data = new_event.data;
        self.extensions = new_event.extensions;
        Ok(self)
    }
}

impl BinarySerializer<Event> for Event {
    fn set_spec_version(mut self, spec_version: SpecVersion) -> Result<Self> {
        match spec_version {
            SpecVersion::V03 => self.attributes = self.attributes.clone().into_v03(),
            SpecVersion::V10 => self.attributes = self.attributes.clone().into_v10(),
        }
        Ok(self)
    }

    fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
        self.attributes.serialize_attribute(name, value)?;
        Ok(self)
    }

    fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
        self.extensions.insert(name.to_string(), value.into());
        Ok(self)
    }

    fn end_with_data(mut self, bytes: Vec<u8>) -> Result<Event> {
        self.data = Some(Data::from_binary(self.get_datacontenttype(), bytes)?);
        Ok(self)
    }

    fn end(self) -> Result<Event> {
        Ok(self)
    }
}